diff --git a/CHANGELOG.md b/CHANGELOG.md index fb0c49613e7..f0967068146 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -305,7 +305,7 @@ to pay for the storage of their accounts. `sum(rate(near_peer_message_received_by_type_total{...}[5m]))`. [#7548](https://github.com/near/nearcore/pull/7548) * Few changes to `view_state` JSON RPC query: - - The requset has now an optional `include_proof` argument. When set to + - The request has now an optional `include_proof` argument. When set to `true`, response’s `proof` will be populated. - The `proof` within each value in `values` list of a `view_state` response is now deprecated and will be removed in the future. Client code should ignore diff --git a/Cargo.lock b/Cargo.lock index c2a49977688..8c2bf77bf3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3335,6 +3335,7 @@ dependencies = [ "near-config-utils", "near-indexer", "near-o11y", + "near-primitives", "openssl-probe", "serde_json", "tokio", @@ -5503,6 +5504,7 @@ dependencies = [ "testlib", "thiserror 2.0.0", "tokio", + "tokio-stream", "tracing", "xz2", ] diff --git a/benchmarks/continuous/db/tool/orm/migrations/2024-06-19-160030_ft_transfers_metadata/up.sql b/benchmarks/continuous/db/tool/orm/migrations/2024-06-19-160030_ft_transfers_metadata/up.sql index e91bf893b76..87aa6a4f1d4 100644 --- a/benchmarks/continuous/db/tool/orm/migrations/2024-06-19-160030_ft_transfers_metadata/up.sql +++ b/benchmarks/continuous/db/tool/orm/migrations/2024-06-19-160030_ft_transfers_metadata/up.sql @@ -1,4 +1,4 @@ --- Setting defaul values to enable `not null` by filling values for existing +-- Setting default values to enable `not null` by filling values for existing -- rows. alter table ft_transfers add column initiator text not null default 'crt-benchmarks', diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 77fa1a00aa8..b4453ed332e 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -795,28 +795,27 @@ impl Chain { fn get_state_sync_info( &self, me: &Option, - epoch_first_block: &Block, + epoch_id: &EpochId, + block_hash: &CryptoHash, + prev_hash: &CryptoHash, + prev_prev_hash: &CryptoHash, ) -> Result, Error> { - let prev_hash = *epoch_first_block.header().prev_hash(); let shards_to_state_sync = Chain::get_shards_to_state_sync( self.epoch_manager.as_ref(), &self.shard_tracker, me, - &prev_hash, + prev_hash, + prev_prev_hash, )?; if shards_to_state_sync.is_empty() { Ok(None) } else { debug!(target: "chain", "Downloading state for {:?}, I'm {:?}", shards_to_state_sync, me); - let epoch_id = epoch_first_block.header().epoch_id(); let protocol_version = self.epoch_manager.get_epoch_protocol_version(epoch_id)?; // Note that this block is the first block in an epoch because this function is only called // in get_catchup_and_state_sync_infos() when that is the case. - let state_sync_info = StateSyncInfo::new( - protocol_version, - *epoch_first_block.header().hash(), - shards_to_state_sync, - ); + let state_sync_info = + StateSyncInfo::new(protocol_version, *block_hash, shards_to_state_sync); Ok(Some(state_sync_info)) } } @@ -2024,7 +2023,6 @@ impl Chain { tracing::debug!(target: "chain", ?shard_id, need_storage_update, "Updating storage"); if need_storage_update { - // TODO(resharding): consider adding to catchup flow. self.resharding_manager.start_resharding( self.chain_store.store_update(), &block, @@ -2272,7 +2270,7 @@ impl Chain { // First real I/O expense. let prev = self.get_previous_header(header)?; let prev_hash = *prev.hash(); - let prev_prev_hash = *prev.prev_hash(); + let prev_prev_hash = prev.prev_hash(); let gas_price = prev.next_gas_price(); let prev_random_value = *prev.random_value(); let prev_height = prev.height(); @@ -2282,8 +2280,13 @@ impl Chain { return Err(Error::InvalidBlockHeight(prev_height)); } - let (is_caught_up, state_sync_info) = - self.get_catchup_and_state_sync_infos(header, prev_hash, prev_prev_hash, me, block)?; + let (is_caught_up, state_sync_info) = self.get_catchup_and_state_sync_infos( + header.epoch_id(), + header.hash(), + &prev_hash, + prev_prev_hash, + me, + )?; self.check_if_challenged_block_on_chain(header)?; @@ -2376,29 +2379,32 @@ impl Chain { fn get_catchup_and_state_sync_infos( &self, - header: &BlockHeader, - prev_hash: CryptoHash, - prev_prev_hash: CryptoHash, + epoch_id: &EpochId, + block_hash: &CryptoHash, + prev_hash: &CryptoHash, + prev_prev_hash: &CryptoHash, me: &Option, - block: &MaybeValidated, ) -> Result<(bool, Option), Error> { - if self.epoch_manager.is_next_block_epoch_start(&prev_hash)? { - debug!(target: "chain", block_hash=?header.hash(), "block is the first block of an epoch"); - if !self.prev_block_is_caught_up(&prev_prev_hash, &prev_hash)? { - // The previous block is not caught up for the next epoch relative to the previous - // block, which is the current epoch for this block, so this block cannot be applied - // at all yet, needs to be orphaned - return Err(Error::Orphan); - } - - // For the first block of the epoch we check if we need to start download states for - // shards that we will care about in the next epoch. If there is no state to be downloaded, - // we consider that we are caught up, otherwise not - let state_sync_info = self.get_state_sync_info(me, block)?; - Ok((state_sync_info.is_none(), state_sync_info)) - } else { - Ok((self.prev_block_is_caught_up(&prev_prev_hash, &prev_hash)?, None)) + if !self.epoch_manager.is_next_block_epoch_start(prev_hash)? { + return Ok((self.prev_block_is_caught_up(prev_prev_hash, prev_hash)?, None)); } + if !self.prev_block_is_caught_up(prev_prev_hash, prev_hash)? { + // The previous block is not caught up for the next epoch relative to the previous + // block, which is the current epoch for this block, so this block cannot be applied + // at all yet, needs to be orphaned + return Err(Error::Orphan); + } + + // For the first block of the epoch we check if we need to start download states for + // shards that we will care about in the next epoch. If there is no state to be downloaded, + // we consider that we are caught up, otherwise not + let state_sync_info = + self.get_state_sync_info(me, epoch_id, block_hash, prev_hash, prev_prev_hash)?; + debug!( + target: "chain", %block_hash, shards_to_sync=?state_sync_info.as_ref().map(|s| s.shards()), + "Checked for shards to sync for epoch T+1 upon processing first block of epoch T" + ); + Ok((state_sync_info.is_none(), state_sync_info)) } pub fn prev_block_is_caught_up( @@ -2426,56 +2432,71 @@ impl Chain { shard_tracker: &ShardTracker, me: &Option, parent_hash: &CryptoHash, + prev_prev_hash: &CryptoHash, ) -> Result, Error> { let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash)?; - Ok((epoch_manager.shard_ids(&epoch_id)?) - .into_iter() - .filter(|shard_id| { - Self::should_catch_up_shard( - epoch_manager, - shard_tracker, - me, - parent_hash, - *shard_id, - ) - }) - .collect()) + let mut shards_to_sync = Vec::new(); + for shard_id in epoch_manager.shard_ids(&epoch_id)? { + if Self::should_catch_up_shard( + epoch_manager, + shard_tracker, + me, + &epoch_id, + parent_hash, + prev_prev_hash, + shard_id, + )? { + shards_to_sync.push(shard_id) + } + } + Ok(shards_to_sync) } + /// Returns whether we need to initiate state sync for the given `shard_id` for the epoch + /// beginning after the block `epoch_last_block`. If that epoch is epoch T, the logic is: + /// - will track the shard in epoch T+1 + /// - AND not tracking it in T + /// - AND didn't track it in T-1 + /// We check that we didn't track it in T-1 because if so, and we're in the relatively rare case + /// where we'll go from tracking it to not tracking it and back to tracking it in consecutive epochs, + /// then we can just continue to apply chunks as if we were tracking it in epoch T, and there's no need to state sync. fn should_catch_up_shard( epoch_manager: &dyn EpochManagerAdapter, shard_tracker: &ShardTracker, me: &Option, - parent_hash: &CryptoHash, + epoch_id: &EpochId, + epoch_last_block: &CryptoHash, + epoch_last_block_prev: &CryptoHash, shard_id: ShardId, - ) -> bool { - let result = epoch_manager.will_shard_layout_change(parent_hash); - let will_shard_layout_change = match result { - Ok(_will_shard_layout_change) => { - // TODO(#11881): before state sync is fixed, we don't catch up - // split shards. Assume that all needed shards are tracked - // already. - // will_shard_layout_change, - false - } - Err(err) => { - // TODO(resharding) This is a problem, if this happens the node - // will not perform resharding and fall behind the network. - tracing::error!(target: "chain", ?err, "failed to check if shard layout will change"); - false - } - }; - // if shard layout will change the next epoch, we should catch up the shard regardless - // whether we already have the shard's state this epoch, because we need to generate - // new states for shards split from the current shard for the next epoch - let will_care_about_shard = - shard_tracker.will_care_about_shard(me.as_ref(), parent_hash, shard_id, true); - let does_care_about_shard = - shard_tracker.care_about_shard(me.as_ref(), parent_hash, shard_id, true); + ) -> Result { + // Won't care about it next epoch, no need to state sync it. + if !shard_tracker.will_care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) { + return Ok(false); + } + // Currently tracking the shard, so no need to state sync it. + if shard_tracker.care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) { + return Ok(false); + } - tracing::debug!(target: "chain", does_care_about_shard, will_care_about_shard, will_shard_layout_change, "should catch up shard"); + // Now we need to state sync it unless we were tracking the parent in the previous epoch, + // in which case we don't need to because we already have the state, and can just continue applying chunks + if epoch_id == &EpochId::default() { + return Ok(true); + } - will_care_about_shard && (will_shard_layout_change || !does_care_about_shard) + let (_layout, parent_shard_id, _index) = + epoch_manager.get_prev_shard_id_from_prev_hash(epoch_last_block, shard_id)?; + // Note that here passing `epoch_last_block_prev` to care_about_shard() will have us check whether we were tracking it in + // the previous epoch, because it is the "parent_hash" of the last block of the previous epoch. + // TODO: consider refactoring these ShardTracker functions to accept an epoch_id + // to make this less tricky. + let tracked_before = shard_tracker.care_about_shard( + me.as_ref(), + epoch_last_block_prev, + parent_shard_id, + true, + ); + Ok(!tracked_before) } /// Check if any block with missing chunk is ready to be processed and start processing these blocks @@ -3127,8 +3148,8 @@ impl Chain { } blocks_catch_up_state.done_blocks.push(queued_block); } - Err(_) => { - error!("Error processing block during catch up, retrying"); + Err(err) => { + error!(target: "chain", ?err, "Error processing block during catch up, retrying"); blocks_catch_up_state.pending_blocks.push(queued_block); } } diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs index 47e78ca8ebc..83a9434115d 100644 --- a/chain/chain/src/flat_storage_resharder.rs +++ b/chain/chain/src/flat_storage_resharder.rs @@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex}; use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle}; use near_chain_primitives::Error; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::resharding::event_type::{ReshardingEventType, ReshardingSplitShardParams}; use crate::resharding::types::{ @@ -1025,9 +1025,16 @@ fn copy_kv_to_child( // Sanity check we are truly writing to one of the expected children shards. if new_shard_uid != *left_child_shard && new_shard_uid != *right_child_shard { - let err_msg = "account id doesn't map to any child shard!"; - error!(target: "resharding", ?new_shard_uid, ?left_child_shard, ?right_child_shard, ?shard_layout, ?account_id, err_msg); - return Err(Error::ReshardingError(err_msg.to_string())); + let err_msg = "account id doesn't map to any child shard! - skipping it"; + warn!(target: "resharding", ?new_shard_uid, ?left_child_shard, ?right_child_shard, ?shard_layout, ?account_id, err_msg); + + // TODO(resharding): add a debug assertion once the root cause is fixed. The current + // hypothesis is that flat storage might contain keys with account_id outside of the shard's + // boundary due to either a bug in the state generation for forknet or corrupted state in + // mainnet. + + // Do not fail resharding. Just skip this entry. + return Ok(()); } // Add the new flat store entry. store_update.set(new_shard_uid, key, value); @@ -2561,4 +2568,63 @@ mod tests { Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head })) ); } + + /// This test asserts that resharding doesn't fail if flat storage iteration goes over an account + /// which is not part of any children shards after the shard layout changes. + #[test] + fn unrelated_account_do_not_fail_splitting() { + init_test_logger(); + let (mut chain, resharder, sender) = + create_chain_resharder_sender::(simple_shard_layout()); + let new_shard_layout = shard_layout_after_split(); + let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); + let ReshardingSplitShardParams { + parent_shard, left_child_shard, right_child_shard, .. + } = match resharding_event_type.clone() { + ReshardingEventType::SplitShard(params) => params, + }; + let flat_store = resharder.runtime.store().flat_store(); + + // Add two blocks on top of genesis. This will make the resharding block (height 0) final. + add_blocks_to_chain( + &mut chain, + 2, + PreviousBlockHeight::ChainHead, + NextBlockHeight::ChainHeadPlusOne, + ); + + // Inject an account which doesn't belong to the parent shard into its flat storage. + let mut store_update = flat_store.store_update(); + let test_value = Some(FlatStateValue::Inlined(vec![0])); + let key = TrieKey::Account { account_id: account!("ab") }; + store_update.set(parent_shard, key.to_vec(), test_value); + store_update.commit().unwrap(); + + // Perform resharding. + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); + sender.call_split_shard_task(); + + // Check final status of parent flat storage. + let parent = ShardUId { version: 3, shard_id: 1 }; + assert_eq!(flat_store.get_flat_storage_status(parent), Ok(FlatStorageStatus::Empty)); + assert_eq!(flat_store.iter(parent).count(), 0); + assert!(resharder + .runtime + .get_flat_storage_manager() + .get_flat_storage_for_shard(parent) + .is_none()); + + // Check intermediate status of children flat storages. + // If children reached the catching up state, it means that the split task succeeded. + for child in [left_child_shard, right_child_shard] { + assert_eq!( + flat_store.get_flat_storage_status(child), + Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( + chain.final_head().unwrap().into() + ))) + ); + // However, the unrelated account should not end up in any child. + assert!(flat_store.get(child, &key.to_vec()).is_ok_and(|val| val.is_none())); + } + } } diff --git a/chain/chain/src/resharding/manager.rs b/chain/chain/src/resharding/manager.rs index 9249a30006d..d1a6f4405dc 100644 --- a/chain/chain/src/resharding/manager.rs +++ b/chain/chain/src/resharding/manager.rs @@ -14,6 +14,7 @@ use near_primitives::challenge::PartialState; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout}; use near_primitives::types::chunk_extra::ChunkExtra; +use near_store::adapter::trie_store::get_shard_uid_mapping; use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::flat::BlockInfo; use near_store::trie::mem::mem_trie_update::TrackingMode; @@ -124,7 +125,7 @@ impl ReshardingManager { return Ok(()); } - // Reshard the State column by setting ShardUId mapping from children to parent. + // Reshard the State column by setting ShardUId mapping from children to ancestor. self.set_state_shard_uid_mapping(&split_shard_event)?; // Create temporary children memtries by freezing parent memtrie and referencing it. @@ -146,16 +147,19 @@ impl ReshardingManager { } /// Store in the database the mapping of ShardUId from children to the parent shard, - /// so that subsequent accesses to the State will use the parent shard's UId as a prefix for the database key. + /// so that subsequent accesses to the State will use the ancestor's ShardUId prefix + /// as a prefix for the database key. + // TODO(resharding) add testloop where grandparent ShardUId is used fn set_state_shard_uid_mapping( &mut self, split_shard_event: &ReshardingSplitShardParams, ) -> io::Result<()> { let mut store_update = self.store.trie_store().store_update(); let parent_shard_uid = split_shard_event.parent_shard; + let parent_shard_uid_prefix = get_shard_uid_mapping(&self.store, parent_shard_uid); // TODO(resharding) No need to set the mapping for children shards that we won't track just after resharding? for child_shard_uid in split_shard_event.children_shards() { - store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid); + store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid_prefix); } store_update.commit() } diff --git a/chain/chain/src/runtime/mod.rs b/chain/chain/src/runtime/mod.rs index 671acbdb3fa..5e331cbd0a0 100644 --- a/chain/chain/src/runtime/mod.rs +++ b/chain/chain/src/runtime/mod.rs @@ -635,7 +635,7 @@ impl RuntimeAdapter for NightshadeRuntime { // flat storage by not charging gas for trie nodes. // WARNING: should never be used in production! Consider this option only for debugging or replaying blocks. let mut trie = self.tries.get_trie_for_shard(shard_uid, storage_config.state_root); - trie.dont_charge_gas_for_trie_node_access(); + trie.set_charge_gas_for_trie_node_access(false); trie } StorageDataSource::Recorded(storage) => Trie::from_recorded_storage( @@ -862,7 +862,7 @@ impl RuntimeAdapter for NightshadeRuntime { storage_config.state_root, false, )?; - trie.dont_charge_gas_for_trie_node_access(); + trie.set_charge_gas_for_trie_node_access(false); trie } StorageDataSource::Recorded(storage) => Trie::from_recorded_storage( diff --git a/chain/chain/src/runtime/test_utils.rs b/chain/chain/src/runtime/test_utils.rs index 2f36e46a40b..2810c5d3e2c 100644 --- a/chain/chain/src/runtime/test_utils.rs +++ b/chain/chain/src/runtime/test_utils.rs @@ -48,6 +48,7 @@ impl NightshadeRuntime { runtime_config_store: Option, trie_config: TrieConfig, state_snapshot_type: StateSnapshotType, + gc_num_epochs_to_keep: u64, ) -> Arc { Self::new( store, @@ -57,7 +58,7 @@ impl NightshadeRuntime { None, None, runtime_config_store, - DEFAULT_GC_NUM_EPOCHS_TO_KEEP, + gc_num_epochs_to_keep, trie_config, StateSnapshotConfig { state_snapshot_type, diff --git a/chain/chain/src/store/mod.rs b/chain/chain/src/store/mod.rs index 0a59b46c640..2affea8801e 100644 --- a/chain/chain/src/store/mod.rs +++ b/chain/chain/src/store/mod.rs @@ -1025,16 +1025,35 @@ impl ChainStore { key } - /// Retrieves STATE_SYNC_DUMP for the given shard. - pub fn get_state_sync_dump_progress( - &self, - shard_id: ShardId, - ) -> Result { - option_to_not_found( - self.store - .get_ser(DBCol::BlockMisc, &ChainStore::state_sync_dump_progress_key(shard_id)), - format!("STATE_SYNC_DUMP:{}", shard_id), - ) + /// For each value stored, this returs an (EpochId, bool), where the bool tells whether it's finished + /// because those are the only fields we really care about. + pub fn iter_state_sync_dump_progress<'a>( + &'a self, + ) -> impl Iterator> + 'a { + self.store + .iter_prefix_ser::(DBCol::BlockMisc, STATE_SYNC_DUMP_KEY) + .map(|item| { + item.and_then(|(key, progress)| { + // + 1 for the ':' + let prefix_len = STATE_SYNC_DUMP_KEY.len() + 1; + let int_part = &key[prefix_len..]; + let int_part = int_part.try_into().map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Bad StateSyncDump columnn key length: {}", key.len()), + ) + })?; + let shard_id = ShardId::from_le_bytes(int_part); + Ok(( + shard_id, + match progress { + StateSyncDumpProgress::AllDumped { epoch_id, .. } => (epoch_id, true), + StateSyncDumpProgress::InProgress { epoch_id, .. } => (epoch_id, false), + StateSyncDumpProgress::Skipped { epoch_id, .. } => (epoch_id, true), + }, + )) + }) + }) } /// Updates STATE_SYNC_DUMP for the given shard. diff --git a/chain/client/src/sync/state/shard.rs b/chain/client/src/sync/state/shard.rs index 0ef25a07bd6..ae65276bea8 100644 --- a/chain/client/src/sync/state/shard.rs +++ b/chain/client/src/sync/state/shard.rs @@ -2,9 +2,8 @@ use super::downloader::StateSyncDownloader; use super::task_tracker::TaskTracker; use crate::metrics; use crate::sync::state::chain_requests::ChainFinalizationRequest; -use crate::sync::state::util::query_epoch_id_and_height_for_block; use futures::{StreamExt, TryStreamExt}; -use near_async::futures::{FutureSpawner, FutureSpawnerExt}; +use near_async::futures::{respawn_for_parallelism, FutureSpawner}; use near_async::messaging::AsyncSender; use near_chain::types::RuntimeAdapter; use near_chain::BlockHeader; @@ -15,6 +14,7 @@ use near_primitives::sharding::ShardChunk; use near_primitives::state_part::PartId; use near_primitives::state_sync::StatePartKey; use near_primitives::types::{EpochId, ShardId}; +use near_primitives::version::PROTOCOL_VERSION; use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::flat::{FlatStorageReadyStatus, FlatStorageStatus}; use near_store::{DBCol, ShardUId, Store}; @@ -162,8 +162,6 @@ pub(super) async fn run_state_sync_for_shard( return_if_cancelled!(cancel); // Create flat storage. { - let (epoch_id, _) = query_epoch_id_and_height_for_block(&store, sync_hash)?; - let shard_uid = epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?; let chunk = header.cloned_chunk(); let block_hash = chunk.prev_block(); @@ -180,8 +178,15 @@ pub(super) async fn run_state_sync_for_shard( // Load memtrie. { let handle = computation_task_tracker.get_handle(&format!("shard {}", shard_id)).await; + let head_protocol_version = epoch_manager.get_epoch_protocol_version(&epoch_id)?; + let shard_uids_pending_resharding = epoch_manager + .get_shard_uids_pending_resharding(head_protocol_version, PROTOCOL_VERSION)?; handle.set_status("Loading memtrie"); - runtime.get_tries().load_mem_trie_on_catchup(&shard_uid, &state_root)?; + runtime.get_tries().load_mem_trie_on_catchup( + &shard_uid, + &state_root, + &shard_uids_pending_resharding, + )?; } return_if_cancelled!(cancel); @@ -275,21 +280,3 @@ async fn apply_state_part( )?; Ok(()) } - -/// Given a future, respawn it as an equivalent future but which does not block the -/// driver of the future. For example, if the given future directly performs -/// computation, normally the whoever drives the future (such as a buffered_unordered) -/// would be blocked by the computation, thereby not allowing computation of other -/// futures driven by the same driver to proceed. This function respawns the future -/// onto the FutureSpawner, so the driver of the returned future would not be blocked. -fn respawn_for_parallelism( - future_spawner: &dyn FutureSpawner, - name: &'static str, - f: impl std::future::Future + Send + 'static, -) -> impl std::future::Future + Send + 'static { - let (sender, receiver) = tokio::sync::oneshot::channel(); - future_spawner.spawn(name, async move { - sender.send(f.await).ok(); - }); - async move { receiver.await.unwrap() } -} diff --git a/chain/indexer/src/lib.rs b/chain/indexer/src/lib.rs index 754f3267f6b..70f314c7624 100644 --- a/chain/indexer/src/lib.rs +++ b/chain/indexer/src/lib.rs @@ -6,7 +6,7 @@ use tokio::sync::mpsc; use near_chain_configs::GenesisValidationMode; pub use near_primitives; -use near_primitives::types::Gas; +use near_primitives::types::{Finality, Gas}; pub use nearcore::{get_default_home, init_configs, NearConfig}; pub use near_indexer_primitives::{ @@ -83,6 +83,8 @@ pub struct IndexerConfig { pub sync_mode: SyncModeEnum, /// Whether await for node to be synced or not pub await_for_node_synced: AwaitForNodeSyncedEnum, + /// Finality level at which blocks are streamed + pub finality: Finality, /// Tells whether to validate the genesis file before starting pub validate_genesis: bool, } diff --git a/chain/indexer/src/streamer/fetchers.rs b/chain/indexer/src/streamer/fetchers.rs index 9e5d93b4665..bdae2b3bdd0 100644 --- a/chain/indexer/src/streamer/fetchers.rs +++ b/chain/indexer/src/streamer/fetchers.rs @@ -29,12 +29,13 @@ pub(crate) async fn fetch_status( /// entire block or we already fetched this block. pub(crate) async fn fetch_latest_block( client: &Addr, + finality: &near_primitives::types::Finality, ) -> Result { tracing::debug!(target: INDEXER, "Fetching latest block"); client .send( near_client::GetBlock(near_primitives::types::BlockReference::Finality( - near_primitives::types::Finality::Final, + finality.clone(), )) .with_span_context(), ) diff --git a/chain/indexer/src/streamer/mod.rs b/chain/indexer/src/streamer/mod.rs index 43a66b761d0..677096be693 100644 --- a/chain/indexer/src/streamer/mod.rs +++ b/chain/indexer/src/streamer/mod.rs @@ -37,7 +37,7 @@ static DELAYED_LOCAL_RECEIPTS_CACHE: std::sync::LazyLock< Arc>>, > = std::sync::LazyLock::new(|| Arc::new(RwLock::new(HashMap::new()))); -const INTERVAL: Duration = Duration::from_millis(500); +const INTERVAL: Duration = Duration::from_millis(250); /// Blocks #47317863 and #47317864 with restored receipts. const PROBLEMATIC_BLOCKS: [CryptoHash; 2] = [ @@ -413,11 +413,12 @@ pub(crate) async fn start( AwaitForNodeSyncedEnum::StreamWhileSyncing => {} }; - let block = if let Ok(block) = fetch_latest_block(&view_client).await { - block - } else { - continue; - }; + let block = + if let Ok(block) = fetch_latest_block(&view_client, &indexer_config.finality).await { + block + } else { + continue; + }; let latest_block_height = block.header.height; let start_syncing_block_height = if let Some(last_synced_block_height) = diff --git a/core/async/src/futures.rs b/core/async/src/futures.rs index 196a2086da0..14bb3f1a3a5 100644 --- a/core/async/src/futures.rs +++ b/core/async/src/futures.rs @@ -42,6 +42,24 @@ impl FutureSpawnerExt for dyn FutureSpawner + '_ { } } +/// Given a future, respawn it as an equivalent future but which does not block the +/// driver of the future. For example, if the given future directly performs +/// computation, normally the whoever drives the future (such as a buffered_unordered) +/// would be blocked by the computation, thereby not allowing computation of other +/// futures driven by the same driver to proceed. This function respawns the future +/// onto the FutureSpawner, so the driver of the returned future would not be blocked. +pub fn respawn_for_parallelism( + future_spawner: &dyn FutureSpawner, + name: &'static str, + f: impl std::future::Future + Send + 'static, +) -> impl std::future::Future + Send + 'static { + let (sender, receiver) = tokio::sync::oneshot::channel(); + future_spawner.spawn(name, async move { + sender.send(f.await).ok(); + }); + async move { receiver.await.unwrap() } +} + /// A FutureSpawner that hands over the future to Actix. pub struct ActixFutureSpawner; diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index d998f5129b0..c75de1e5cf6 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -694,11 +694,16 @@ impl Trie { )))), None => RefCell::new(TrieAccountingCache::new(None)), }; + // Technically the charge_gas_for_trie_node_access should be set based + // on the flat storage protocol feature. When flat storage is enabled + // the trie node access should be free and the charge flag should be set + // to false. + let charge_gas_for_trie_node_access = false; Trie { storage, memtries, root, - charge_gas_for_trie_node_access: flat_storage_chunk_view.is_none(), + charge_gas_for_trie_node_access, flat_storage_chunk_view, accounting_cache, recorder: None, @@ -711,8 +716,8 @@ impl Trie { } /// Helper to simulate gas costs as if flat storage was present. - pub fn dont_charge_gas_for_trie_node_access(&mut self) { - self.charge_gas_for_trie_node_access = false; + pub fn set_charge_gas_for_trie_node_access(&mut self, value: bool) { + self.charge_gas_for_trie_node_access = value; } /// Makes a new trie that has everything the same except that access diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 40bf4717c5e..0ca4e580a4f 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -429,12 +429,16 @@ impl ShardTries { /// Loads in-memory trie upon catchup, if it is enabled. /// Requires state root because `ChunkExtra` is not available at the time mem-trie is being loaded. + /// Mem-tries of shards that are pending resharding must be loaded in any case. pub fn load_mem_trie_on_catchup( &self, shard_uid: &ShardUId, state_root: &StateRoot, + shard_uids_pending_resharding: &HashSet, ) -> Result<(), StorageError> { - if !self.0.trie_config.load_mem_tries_for_tracked_shards { + if !self.0.trie_config.load_mem_tries_for_tracked_shards + && !shard_uids_pending_resharding.contains(shard_uid) + { return Ok(()); } // It should not happen that memtrie is already loaded for a shard diff --git a/core/store/src/trie/trie_recording.rs b/core/store/src/trie/trie_recording.rs index c38180cf715..99559fabb24 100644 --- a/core/store/src/trie/trie_recording.rs +++ b/core/store/src/trie/trie_recording.rs @@ -469,7 +469,9 @@ mod trie_recording_tests { false, ) } else { - tries.get_trie_for_shard(shard_uid, state_root) + let mut trie = tries.get_trie_for_shard(shard_uid, state_root); + trie.charge_gas_for_trie_node_access = true; + trie } } diff --git a/core/store/src/trie/trie_tests.rs b/core/store/src/trie/trie_tests.rs index 1cae1d30e96..da916ee4712 100644 --- a/core/store/src/trie/trie_tests.rs +++ b/core/store/src/trie/trie_tests.rs @@ -183,7 +183,8 @@ mod nodes_counter_tests { (create_trie_key(&[0, 1, 1]), Some(vec![1])), (create_trie_key(&[1, 0, 0]), Some(vec![2])), ]; - let trie = create_trie(&trie_items); + let mut trie = create_trie(&trie_items); + trie.charge_gas_for_trie_node_access = true; assert_eq!(get_touched_nodes_numbers(&trie, &trie_items), vec![5, 5, 4]); } @@ -197,7 +198,8 @@ mod nodes_counter_tests { (create_trie_key(&[0, 0]), Some(vec![1])), (create_trie_key(&[1, 1]), Some(vec![1])), ]; - let trie = create_trie(&trie_items); + let mut trie = create_trie(&trie_items); + trie.charge_gas_for_trie_node_access = true; assert_eq!(get_touched_nodes_numbers(&trie, &trie_items), vec![4, 4]); } } diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index fa687ec99f7..aabffeaff05 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -593,6 +593,7 @@ impl TestLoopBuilder { self.runtime_config_store.clone(), TrieConfig::from_store_config(&store_config), StateSnapshotType::EveryEpoch, + client_config.gc.gc_num_epochs_to_keep, ); let state_snapshot = StateSnapshotActor::new( @@ -671,6 +672,7 @@ impl TestLoopBuilder { self.runtime_config_store.clone(), TrieConfig::from_store_config(&store_config), StateSnapshotType::EveryEpoch, + client_config.gc.gc_num_epochs_to_keep, ); (view_epoch_manager, view_shard_tracker, view_runtime_adapter) } else { @@ -755,6 +757,7 @@ impl TestLoopBuilder { future_spawner.spawn_boxed("state_sync_dumper", future); Box::new(|| {}) }), + future_spawner: Arc::new(self.test_loop.future_spawner()), handle: None, }; let state_sync_dumper_handle = self.test_loop.data.register_data(state_sync_dumper); diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index f6064417095..727323458be 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -21,8 +21,9 @@ use crate::test_loop::utils::receipts::{ #[cfg(feature = "test_features")] use crate::test_loop::utils::resharding::fork_before_resharding_block; use crate::test_loop::utils::resharding::{ - call_burn_gas_contract, call_promise_yield, execute_money_transfers, - temporary_account_during_resharding, + call_burn_gas_contract, call_promise_yield, check_state_cleanup_after_resharding, + execute_money_transfers, execute_storage_operations, temporary_account_during_resharding, + TrackedShardSchedule, }; use crate::test_loop::utils::sharding::print_and_assert_shard_accounts; use crate::test_loop::utils::transactions::{ @@ -45,6 +46,18 @@ const DEFAULT_EPOCH_LENGTH: u64 = 6; /// and later we would hit the `DBNotFoundErr("Transaction ...)` error in tests. const INCREASED_EPOCH_LENGTH: u64 = 8; +/// Garbage collection window length. +const GC_NUM_EPOCHS_TO_KEEP: u64 = 3; + +/// Maximum number of epochs under which the test should finish. +const TESTLOOP_NUM_EPOCHS_TO_WAIT: u64 = 8; + +/// Default shard layout version used in resharding tests. +const DEFAULT_SHARD_LAYOUT_VERSION: u64 = 2; + +/// Account used in resharding tests as a split boundary. +const NEW_BOUNDARY_ACCOUNT: &str = "account6"; + #[derive(derive_builder::Builder)] #[builder(pattern = "owned", build_fn(skip))] #[allow(unused)] @@ -84,6 +97,12 @@ struct TestReshardingParameters { chunk_ranges_to_drop: HashMap>, shuffle_shard_assignment_for_chunk_producers: bool, track_all_shards: bool, + // Manually specify what shards will be tracked for a given client ID. + // The client ID must not be used for any other role (validator, RPC, etc.). + // The schedule length must be more than `TESTLOOP_NUM_EPOCHS_TO_WAIT` so that it covers all epoch heights used in the test. + // The suffix must consist of `GC_NUM_EPOCHS_TO_KEEP` repetitions of the same shard, + // so that we can assert at the end of the test that the state of all other shards have been cleaned up. + tracked_shard_schedule: Option, load_mem_tries_for_tracked_shards: bool, /// Custom behavior executed at every iteration of test loop. #[builder(setter(custom))] @@ -115,7 +134,10 @@ struct TestReshardingParameters { impl TestReshardingParametersBuilder { fn build(self) -> TestReshardingParameters { + // Give enough time for GC to kick in after resharding. + assert!(GC_NUM_EPOCHS_TO_KEEP + 2 < TESTLOOP_NUM_EPOCHS_TO_WAIT); let epoch_length = self.epoch_length.unwrap_or(DEFAULT_EPOCH_LENGTH); + let tracked_shard_schedule = self.tracked_shard_schedule.unwrap_or(None); let num_accounts = self.num_accounts.unwrap_or(8); let num_clients = self.num_clients.unwrap_or(7); @@ -123,8 +145,12 @@ impl TestReshardingParametersBuilder { let num_validators = self.num_validators.unwrap_or(2); let num_rpcs = self.num_rpcs.unwrap_or(1); let num_archivals = self.num_archivals.unwrap_or(1); + let num_extra_nodes = if tracked_shard_schedule.is_some() { 1 } else { 0 }; - assert!(num_clients >= num_producers + num_validators + num_rpcs + num_archivals); + assert!( + num_clients + >= num_producers + num_validators + num_rpcs + num_archivals + num_extra_nodes + ); // #12195 prevents number of BPs bigger than `epoch_length`. assert!(num_producers > 0 && num_producers <= epoch_length); @@ -157,9 +183,23 @@ impl TestReshardingParametersBuilder { let validators = validators.to_vec(); let (rpcs, tmp) = tmp.split_at(num_rpcs as usize); let rpcs = rpcs.to_vec(); - let (archivals, _) = tmp.split_at(num_archivals as usize); + let (archivals, clients_without_role) = tmp.split_at(num_archivals as usize); let archivals = archivals.to_vec(); + if let Some(tracked_shard_schedule) = &tracked_shard_schedule { + assert!(clients_without_role.contains(&clients[tracked_shard_schedule.client_index])); + let schedule_length = tracked_shard_schedule.schedule.len(); + assert!(schedule_length > TESTLOOP_NUM_EPOCHS_TO_WAIT as usize); + for i in + (TESTLOOP_NUM_EPOCHS_TO_WAIT - GC_NUM_EPOCHS_TO_KEEP - 1) as usize..schedule_length + { + assert_eq!( + tracked_shard_schedule.schedule[i - 1], + tracked_shard_schedule.schedule[i] + ); + } + } + let client_index = if rpcs.is_empty() { 0 } else { num_producers + num_validators } as usize; let client_id = clients[client_index].clone(); @@ -167,10 +207,12 @@ impl TestReshardingParametersBuilder { println!("Clients setup:"); println!("Producers: {producers:?}"); println!("Validators: {validators:?}"); - println!("Rpcs: {rpcs:?}, to serve requests we use client: {client_id}"); + println!("Rpcs: {rpcs:?}"); println!("Archivals: {archivals:?}"); + println!("To serve requests, we use client: {client_id}"); + println!("Num extra nodes: {num_extra_nodes}"); - let new_boundary_account: AccountId = "account6".parse().unwrap(); + let new_boundary_account: AccountId = NEW_BOUNDARY_ACCOUNT.parse().unwrap(); let temporary_account_id: AccountId = format!("{}.{}", new_boundary_account, new_boundary_account).parse().unwrap(); let mut loop_actions = self.loop_actions.unwrap_or_default(); @@ -186,7 +228,9 @@ impl TestReshardingParametersBuilder { } TestReshardingParameters { - base_shard_layout_version: self.base_shard_layout_version.unwrap_or(2), + base_shard_layout_version: self + .base_shard_layout_version + .unwrap_or(DEFAULT_SHARD_LAYOUT_VERSION), num_accounts, num_clients, num_producers, @@ -208,6 +252,7 @@ impl TestReshardingParametersBuilder { .shuffle_shard_assignment_for_chunk_producers .unwrap_or(false), track_all_shards: self.track_all_shards.unwrap_or(false), + tracked_shard_schedule, load_mem_tries_for_tracked_shards: self .load_mem_tries_for_tracked_shards .unwrap_or(true), @@ -266,12 +311,20 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { init_test_logger(); let mut builder = TestLoopBuilder::new(); + let tracked_shard_schedule = params.tracked_shard_schedule.clone(); - // Adjust the resharding configuration to make the tests faster. - builder = builder.config_modifier(|config, _| { + builder = builder.config_modifier(move |config, client_index| { + // Adjust the resharding configuration to make the tests faster. let mut resharding_config = config.resharding_config.get(); resharding_config.batch_delay = Duration::milliseconds(1); config.resharding_config.update(resharding_config); + // Set the tracked shard schedule if specified for the client at the given index. + if let Some(tracked_shard_schedule) = &tracked_shard_schedule { + if client_index == tracked_shard_schedule.client_index { + config.tracked_shards = vec![]; + config.tracked_shard_schedule = tracked_shard_schedule.schedule.clone(); + } + } }); // Prepare shard split configuration. @@ -356,6 +409,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { base_protocol_version + 1, params.chunk_ranges_to_drop.clone(), ) + .gc_num_epochs_to_keep(GC_NUM_EPOCHS_TO_KEEP) .build(); let mut test_setup_transactions = vec![]; @@ -403,7 +457,6 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { client_handles.iter().map(|handle| &env.test_loop.data.get(handle).client).collect_vec(); let mut trie_sanity_check = TrieSanityCheck::new(&clients, params.load_mem_tries_for_tracked_shards); - let gc_num_epochs_to_keep = clients[client_index].config.gc.gc_num_epochs_to_keep; let latest_block_height = Cell::new(0u64); // Height of a block after resharding. @@ -430,9 +483,13 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { } latest_block_height.set(tip.height); - // Check that all chunks are included. let client = clients[client_index]; let block_header = client.chain.get_block_header(&tip.last_block_hash).unwrap(); + let shard_layout = client.epoch_manager.get_shard_layout(&tip.epoch_id).unwrap(); + println!("Block: {:?} {} {:?}", tip.last_block_hash, tip.height, block_header.chunk_mask()); + println!("Shard IDs: {:?}", shard_layout.shard_ids().collect_vec()); + + // Check that all chunks are included. if params.all_chunks_expected && params.chunk_ranges_to_drop.is_empty() { assert!(block_header.chunk_mask().iter().all(|chunk_bit| *chunk_bit)); } @@ -456,6 +513,8 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { // Just passed an epoch with increased number of shards. new_layout_block_height.set(Some(latest_block_height.get())); new_layout_epoch_height.set(Some(epoch_height)); + // Assert that we will have a chance for gc to kick in before the test is over. + assert!(epoch_height + GC_NUM_EPOCHS_TO_KEEP < TESTLOOP_NUM_EPOCHS_TO_WAIT); println!("State after resharding:"); print_and_assert_shard_accounts(&clients, &tip); } @@ -467,7 +526,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { ); // Return false if garbage collection window has not passed yet since resharding. - if epoch_height <= new_layout_epoch_height.get().unwrap() + gc_num_epochs_to_keep { + if epoch_height <= new_layout_epoch_height.get().unwrap() + GC_NUM_EPOCHS_TO_KEEP { return false; } for loop_action in ¶ms.loop_actions { @@ -478,8 +537,8 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { env.test_loop.run_until( success_condition, - // Give enough time to produce ~7 epochs. - Duration::seconds((7 * params.epoch_length) as i64), + // Give enough time to produce ~TESTLOOP_NUM_EPOCHS_TO_WAIT epochs. + Duration::seconds((TESTLOOP_NUM_EPOCHS_TO_WAIT * params.epoch_length) as i64), ); let client = &env.test_loop.data.get(&client_handles[client_index]).client; trie_sanity_check.check_epochs(client); @@ -492,6 +551,48 @@ fn test_resharding_v3() { test_resharding_v3_base(TestReshardingParametersBuilder::default().build()); } +// Takes a sequence of shard ids to track in consecutive epochs, +// repeats the last element `TESTLOOP_NUM_EPOCHS_TO_WAIT` times, +// and maps each element: |id| -> vec![id], to the format required by `TrackedShardSchedule`. +fn shard_sequence_to_schedule(mut shard_sequence: Vec) -> Vec> { + shard_sequence.extend( + std::iter::repeat(*shard_sequence.last().unwrap()) + .take(TESTLOOP_NUM_EPOCHS_TO_WAIT as usize), + ); + shard_sequence.iter().map(|shard_id| vec![*shard_id]).collect() +} + +#[test] +// TODO(resharding): fix nearcore and un-ignore this test +#[ignore] +fn test_resharding_v3_state_cleanup() { + // Track parent shard before resharding, child shard after resharding, and then an unrelated shard forever. + // Eventually, the State column should only contain entries belonging to the last tracked shard. + let account_in_stable_shard: AccountId = "account0".parse().unwrap(); + let split_boundary_account: AccountId = NEW_BOUNDARY_ACCOUNT.parse().unwrap(); + let base_shard_layout = get_base_shard_layout(DEFAULT_SHARD_LAYOUT_VERSION); + let new_shard_layout = + ShardLayout::derive_shard_layout(&base_shard_layout, split_boundary_account.clone()); + let parent_shard_id = base_shard_layout.account_id_to_shard_id(&split_boundary_account); + let child_shard_id = new_shard_layout.account_id_to_shard_id(&split_boundary_account); + let unrelated_shard_id = new_shard_layout.account_id_to_shard_id(&account_in_stable_shard); + + let tracked_shard_sequence = + vec![parent_shard_id, parent_shard_id, child_shard_id, unrelated_shard_id]; + let num_clients = 8; + let tracked_shard_schedule = TrackedShardSchedule { + client_index: (num_clients - 1) as usize, + schedule: shard_sequence_to_schedule(tracked_shard_sequence), + }; + test_resharding_v3_base( + TestReshardingParametersBuilder::default() + .num_clients(num_clients) + .tracked_shard_schedule(Some(tracked_shard_schedule.clone())) + .add_loop_action(check_state_cleanup_after_resharding(tracked_shard_schedule)) + .build(), + ); +} + #[test] fn test_resharding_v3_track_all_shards() { test_resharding_v3_base( @@ -564,7 +665,9 @@ fn test_resharding_v3_resharding_block_in_fork() { #[test] // TODO(resharding): fix nearcore and un-ignore this test // TODO(resharding): duplicate this test so that in one case resharding is performed on block -// B(height=13) and in another case resharding is performed on block B'(height=13) +// B(height=13) and in another case resharding is performed on block B'(height=13). +// In the current scenario the real resharding happens on block B'. Low priority TODO +// since it's a very rare corner case. #[ignore] #[cfg(feature = "test_features")] fn test_resharding_v3_double_sign_resharding_block() { @@ -588,6 +691,38 @@ fn test_resharding_v3_shard_shuffling() { test_resharding_v3_base(params); } +/// This tests an edge case where we track the parent in the pre-resharding epoch, then we +/// track an unrelated shard in the first epoch after resharding, then we track a child of the resharding +/// in the next epoch after that. In that case we don't want to state sync because we can just perform +/// the resharding and continue applying chunks for the child in the first epoch post-resharding. +#[test] +fn test_resharding_v3_shard_shuffling_untrack_then_track() { + let account_in_stable_shard: AccountId = "account0".parse().unwrap(); + let split_boundary_account: AccountId = NEW_BOUNDARY_ACCOUNT.parse().unwrap(); + let base_shard_layout = get_base_shard_layout(DEFAULT_SHARD_LAYOUT_VERSION); + let new_shard_layout = + ShardLayout::derive_shard_layout(&base_shard_layout, split_boundary_account.clone()); + let parent_shard_id = base_shard_layout.account_id_to_shard_id(&split_boundary_account); + let child_shard_id = new_shard_layout.account_id_to_shard_id(&split_boundary_account); + let unrelated_shard_id = new_shard_layout.account_id_to_shard_id(&account_in_stable_shard); + + let tracked_shard_sequence = + vec![parent_shard_id, parent_shard_id, unrelated_shard_id, child_shard_id]; + let num_clients = 8; + let tracked_shard_schedule = TrackedShardSchedule { + client_index: (num_clients - 1) as usize, + schedule: shard_sequence_to_schedule(tracked_shard_sequence), + }; + let params = TestReshardingParametersBuilder::default() + .shuffle_shard_assignment_for_chunk_producers(true) + .num_clients(num_clients) + .tracked_shard_schedule(Some(tracked_shard_schedule)) + // TODO(resharding): uncomment after fixing test_resharding_v3_state_cleanup() + //.add_loop_action(check_state_cleanup_after_resharding(tracked_shard_schedule)) + .build(); + test_resharding_v3_base(params); +} + #[test] fn test_resharding_v3_shard_shuffling_intense() { let chunk_ranges_to_drop = HashMap::from([(0, -1..2), (1, -3..0), (2, -3..3), (3, 0..1)]); @@ -603,6 +738,24 @@ fn test_resharding_v3_shard_shuffling_intense() { test_resharding_v3_base(params); } +/// Executes storage operations at every block height. +/// In particular, checks that storage gas costs are computed correctly during +/// resharding. Caught a bug with invalid storage costs computed during flat +/// storage resharding. +#[test] +fn test_resharding_v3_storage_operations() { + let sender_account: AccountId = "account1".parse().unwrap(); + let account_in_parent: AccountId = "account4".parse().unwrap(); + let params = TestReshardingParametersBuilder::default() + .deploy_test_contract(account_in_parent.clone()) + .add_loop_action(execute_storage_operations(sender_account, account_in_parent)) + .all_chunks_expected(true) + .delay_flat_state_resharding(2) + .epoch_length(13) + .build(); + test_resharding_v3_base(params); +} + #[test] #[cfg_attr(not(feature = "test_features"), ignore)] fn test_resharding_v3_delayed_receipts_left_child() { @@ -770,8 +923,6 @@ fn test_resharding_v3_load_mem_trie_v1() { let params = TestReshardingParametersBuilder::default() .base_shard_layout_version(1) .load_mem_tries_for_tracked_shards(false) - // TODO(resharding): should it work without tracking all shards? - .track_all_shards(true) .build(); test_resharding_v3_base(params); } @@ -781,8 +932,6 @@ fn test_resharding_v3_load_mem_trie_v2() { let params = TestReshardingParametersBuilder::default() .base_shard_layout_version(2) .load_mem_tries_for_tracked_shards(false) - // TODO(resharding): should it work without tracking all shards? - .track_all_shards(true) .build(); test_resharding_v3_base(params); } diff --git a/integration-tests/src/test_loop/utils/resharding.rs b/integration-tests/src/test_loop/utils/resharding.rs index 005fc3f3fad..c1d2ceeec47 100644 --- a/integration-tests/src/test_loop/utils/resharding.rs +++ b/integration-tests/src/test_loop/utils/resharding.rs @@ -1,16 +1,26 @@ use std::cell::Cell; +use std::collections::HashSet; +use std::num::NonZero; use assert_matches::assert_matches; +use borsh::BorshDeserialize; use itertools::Itertools; use near_async::test_loop::data::TestLoopData; +use near_chain::ChainStoreAccess; +use near_client::Client; use near_client::{Query, QueryError::GarbageCollectedBlock}; use near_crypto::Signer; +use near_primitives::action::{Action, FunctionCallAction}; +use near_primitives::hash::CryptoHash; use near_primitives::test_utils::create_user_test_signer; use near_primitives::transaction::SignedTransaction; -use near_primitives::types::{AccountId, BlockId, BlockReference, Gas}; +use near_primitives::types::{AccountId, BlockId, BlockReference, Gas, ShardId}; use near_primitives::views::{ FinalExecutionStatus, QueryRequest, QueryResponse, QueryResponseKind, }; +use near_store::adapter::StoreAdapter; +use near_store::db::refcount::decode_value_with_rc; +use near_store::{DBCol, ShardUId}; use rand::seq::SliceRandom; use rand::{Rng, SeedableRng}; use rand_chacha::ChaCha20Rng; @@ -24,6 +34,14 @@ use crate::test_loop::utils::transactions::{ }; use crate::test_loop::utils::{get_node_data, retrieve_client_actor, ONE_NEAR, TGAS}; +/// A config to tell what shards will be tracked by the client at the given index. +/// For more details, see `TrackedConfig::Schedule`. +#[derive(Clone, Debug)] +pub(crate) struct TrackedShardSchedule { + pub client_index: usize, + pub schedule: Vec>, +} + // Returns a callable function that, when invoked inside a test loop iteration, can force the creation of a chain fork. #[cfg(feature = "test_features")] pub(crate) fn fork_before_resharding_block(double_signing: bool) -> LoopAction { @@ -122,6 +140,97 @@ pub(crate) fn execute_money_transfers(account_ids: Vec) -> LoopAction LoopAction::new(action_fn, succeeded) } +/// Returns a loop action that makes storage read and write at every block +/// height. +pub(crate) fn execute_storage_operations( + sender_id: AccountId, + receiver_id: AccountId, +) -> LoopAction { + const TX_CHECK_DEADLINE: u64 = 5; + let latest_height = Cell::new(0); + let txs = Cell::new(vec![]); + let nonce = Cell::new(102); + + let (ran_transfers, succeeded) = LoopAction::shared_success_flag(); + + let action_fn = Box::new( + move |node_datas: &[TestData], + test_loop_data: &mut TestLoopData, + client_account_id: AccountId| { + let client_actor = + retrieve_client_actor(node_datas, test_loop_data, &client_account_id); + let tip = client_actor.client.chain.head().unwrap(); + + // Run this action only once at every block height. + if latest_height.get() == tip.height { + return; + } + latest_height.set(tip.height); + + let mut remaining_txs = vec![]; + for (tx, tx_height) in txs.take() { + if tx_height + TX_CHECK_DEADLINE >= tip.height { + remaining_txs.push((tx, tx_height)); + continue; + } + + let tx_outcome = client_actor.client.chain.get_partial_transaction_result(&tx); + let status = tx_outcome.as_ref().map(|o| o.status.clone()); + assert_matches!(status, Ok(FinalExecutionStatus::SuccessValue(_))); + } + txs.set(remaining_txs); + + let clients = node_datas + .iter() + .map(|test_data| { + &test_loop_data.get(&test_data.client_sender.actor_handle()).client + }) + .collect_vec(); + + // Send transaction which reads a key and writes a key-value pair + // to the contract storage. + let anchor_hash = get_anchor_hash(&clients); + let gas = 20 * TGAS; + let salt = 2 * tip.height; + nonce.set(nonce.get() + 1); + let read_action = Action::FunctionCall(Box::new(FunctionCallAction { + args: near_primitives::test_utils::encode(&[salt]), + method_name: "read_value".to_string(), + gas, + deposit: 0, + })); + let write_action = Action::FunctionCall(Box::new(FunctionCallAction { + args: near_primitives::test_utils::encode(&[salt + 1, salt * 10]), + method_name: "write_key_value".to_string(), + gas, + deposit: 0, + })); + let tx = SignedTransaction::from_actions( + nonce.get(), + sender_id.clone(), + receiver_id.clone(), + &create_user_test_signer(&sender_id).into(), + vec![read_action, write_action], + anchor_hash, + 0, + ); + + store_and_submit_tx( + &node_datas, + &client_account_id, + &txs, + &sender_id, + &receiver_id, + tip.height, + tx, + ); + ran_transfers.set(true); + }, + ); + + LoopAction::new(action_fn, succeeded) +} + /// Returns a loop action that invokes a costly method from a contract /// `CALLS_PER_BLOCK_HEIGHT` times per block height. /// @@ -481,3 +590,100 @@ pub(crate) fn temporary_account_during_resharding( ); LoopAction::new(action_fn, succeeded) } + +/// Removes from State column all entries where key does not start with `the_only_shard_uid` ShardUId prefix. +fn retain_the_only_shard_state(client: &Client, the_only_shard_uid: ShardUId) { + let store = client.chain.chain_store.store().trie_store(); + let mut store_update = store.store_update(); + for kv in store.store().iter_raw_bytes(DBCol::State) { + let (key, value) = kv.unwrap(); + let shard_uid = ShardUId::try_from_slice(&key[0..8]).unwrap(); + if shard_uid == the_only_shard_uid { + continue; + } + let (_, rc) = decode_value_with_rc(&value); + assert!(rc > 0); + let node_hash = CryptoHash::try_from_slice(&key[8..]).unwrap(); + store_update.decrement_refcount_by(shard_uid, &node_hash, NonZero::new(rc as u32).unwrap()); + } + store_update.commit().unwrap(); +} + +/// Asserts that all other shards State except `the_only_shard_uid` have been cleaned-up. +fn check_has_the_only_shard_state(client: &Client, the_only_shard_uid: ShardUId) { + let store = client.chain.chain_store.store().trie_store(); + let mut shard_uid_prefixes = HashSet::new(); + for kv in store.store().iter_raw_bytes(DBCol::State) { + let (key, _) = kv.unwrap(); + let shard_uid = ShardUId::try_from_slice(&key[0..8]).unwrap(); + shard_uid_prefixes.insert(shard_uid); + } + let shard_uid_prefixes = shard_uid_prefixes.into_iter().collect_vec(); + assert_eq!(shard_uid_prefixes, [the_only_shard_uid]); +} + +// Loop action testing state cleanup after resharding. +// It assumes single shard tracking and it waits for gc after resharding. +// Then it checks whether the last shard tracked by the client +// is the only ShardUId prefix for nodes in the State column. +pub(crate) fn check_state_cleanup_after_resharding( + tracked_shard_schedule: TrackedShardSchedule, +) -> LoopAction { + let client_index = tracked_shard_schedule.client_index; + let latest_height = Cell::new(0); + let target_height = Cell::new(None); + + let (done, succeeded) = LoopAction::shared_success_flag(); + let action_fn = Box::new( + move |node_datas: &[TestData], test_loop_data: &mut TestLoopData, _: AccountId| { + if done.get() { + return; + } + + let client_handle = node_datas[client_index].client_sender.actor_handle(); + let client = &test_loop_data.get_mut(&client_handle).client; + let tip = client.chain.head().unwrap(); + + // Run this action only once at every block height. + if latest_height.get() == tip.height { + return; + } + + let epoch_height = client + .epoch_manager + .get_epoch_height_from_prev_block(&tip.prev_block_hash) + .unwrap(); + let [tracked_shard_id] = + tracked_shard_schedule.schedule[epoch_height as usize].clone().try_into().unwrap(); + let tracked_shard_uid = + client.epoch_manager.shard_id_to_uid(tracked_shard_id, &tip.epoch_id).unwrap(); + + if latest_height.get() == 0 { + // This is beginning of the test, and the first epoch after genesis has height 1. + assert_eq!(epoch_height, 1); + // Get rid of the part of the Genesis State other than the shard we initially track. + retain_the_only_shard_state(client, tracked_shard_uid); + } + latest_height.set(tip.height); + + if target_height.get().is_none() { + if !this_block_has_new_shard_layout(client.epoch_manager.as_ref(), &tip) { + return; + } + // Just resharded. Set the target height high enough so that gc will kick in. + let epoch_length = client.config.epoch_length; + let gc_num_epochs_to_keep = client.config.gc.gc_num_epochs_to_keep; + target_height + .set(Some(latest_height.get() + (gc_num_epochs_to_keep + 1) * epoch_length)); + } + + if latest_height.get() < target_height.get().unwrap() { + return; + } + // At this point, we should only have State from the last tracked shard. + check_has_the_only_shard_state(&client, tracked_shard_uid); + done.set(true); + }, + ); + LoopAction::new(action_fn, succeeded) +} diff --git a/integration-tests/src/test_loop/utils/trie_sanity.rs b/integration-tests/src/test_loop/utils/trie_sanity.rs index ca12fe83b35..31acd1e32cb 100644 --- a/integration-tests/src/test_loop/utils/trie_sanity.rs +++ b/integration-tests/src/test_loop/utils/trie_sanity.rs @@ -351,6 +351,7 @@ pub fn check_state_shard_uid_mapping_after_resharding( assert_eq!(children_shard_uids.len(), 2); let store = client.chain.chain_store.store().trie_store(); + let mut checked_any = false; for kv in store.store().iter_raw_bytes(DBCol::State) { let (key, value) = kv.unwrap(); let shard_uid = ShardUId::try_from_slice(&key[0..8]).unwrap(); @@ -359,6 +360,7 @@ pub fn check_state_shard_uid_mapping_after_resharding( if shard_uid != parent_shard_uid { continue; } + checked_any = true; let node_hash = CryptoHash::try_from_slice(&key[8..]).unwrap(); let (value, rc) = decode_value_with_rc(&value); // It is possible we have delayed receipts leftovers on disk, @@ -381,4 +383,5 @@ pub fn check_state_shard_uid_mapping_after_resharding( assert_eq!(&child_value.unwrap()[..], value.unwrap()); } } + assert!(checked_any); } diff --git a/integration-tests/src/tests/client/state_dump.rs b/integration-tests/src/tests/client/state_dump.rs index 5ca06f2d4bb..e3db99a5a9c 100644 --- a/integration-tests/src/tests/client/state_dump.rs +++ b/integration-tests/src/tests/client/state_dump.rs @@ -1,5 +1,6 @@ use assert_matches::assert_matches; +use near_async::futures::ActixFutureSpawner; use near_async::time::{Clock, Duration}; use near_chain::near_chain_primitives::error::QueryError; use near_chain::{ChainGenesis, ChainStoreAccess, Provenance}; @@ -66,6 +67,7 @@ fn slow_test_state_dump() { runtime, validator, dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(), + future_spawner: Arc::new(ActixFutureSpawner), handle: None, }; state_sync_dumper.start().unwrap(); @@ -171,6 +173,7 @@ fn run_state_sync_with_dumped_parts( runtime, validator, dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(), + future_spawner: Arc::new(ActixFutureSpawner), handle: None, }; state_sync_dumper.start().unwrap(); diff --git a/integration-tests/src/user/runtime_user.rs b/integration-tests/src/user/runtime_user.rs index e94f6eb7905..3d54b20b8b1 100644 --- a/integration-tests/src/user/runtime_user.rs +++ b/integration-tests/src/user/runtime_user.rs @@ -102,7 +102,10 @@ impl RuntimeUser { false, ) } else { - client.tries.get_trie_for_shard(ShardUId::single_shard(), client.state_root) + let shard_uid = ShardUId::single_shard(); + let mut trie = client.tries.get_trie_for_shard(shard_uid, client.state_root); + trie.set_charge_gas_for_trie_node_access(true); + trie }; let apply_result = client .runtime diff --git a/nearcore/Cargo.toml b/nearcore/Cargo.toml index b99d5574ced..931f63bdd7f 100644 --- a/nearcore/Cargo.toml +++ b/nearcore/Cargo.toml @@ -44,6 +44,7 @@ strum.workspace = true tempfile.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-stream.workspace = true tracing.workspace = true xz2.workspace = true diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index c7619d6c7b3..acd35c35cc1 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -390,6 +390,7 @@ pub fn start_with_config_and_synchronization( let state_sync_runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); + let state_sync_spawner = Arc::new(TokioRuntimeFutureSpawner(state_sync_runtime.clone())); let StartClientResult { client_actor, client_arbiter_handle, resharding_handle } = start_client( Clock::real(), config.client_config.clone(), @@ -398,7 +399,7 @@ pub fn start_with_config_and_synchronization( shard_tracker.clone(), runtime.clone(), node_id, - Arc::new(TokioRuntimeFutureSpawner(state_sync_runtime.clone())), + state_sync_spawner.clone(), network_adapter.as_multi_sender(), shards_manager_adapter.as_sender(), config.validator_signer.clone(), @@ -435,6 +436,7 @@ pub fn start_with_config_and_synchronization( runtime, validator: config.validator_signer.clone(), dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(), + future_spawner: state_sync_spawner, handle: None, }; state_sync_dumper.start()?; diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 481fc530207..dc824731bd1 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -4,11 +4,11 @@ use actix_rt::Arbiter; use anyhow::Context; use borsh::BorshSerialize; use futures::future::BoxFuture; -use futures::FutureExt; -use itertools::Itertools; -use near_async::time::{Clock, Duration, Instant}; +use futures::{FutureExt, StreamExt}; +use near_async::futures::{respawn_for_parallelism, FutureSpawner}; +use near_async::time::{Clock, Duration, Interval}; use near_chain::types::RuntimeAdapter; -use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode, Error}; +use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode}; use near_chain_configs::{ClientConfig, ExternalStorageLocation, MutableValidatorSigner}; use near_client::sync::external::{ create_bucket_readwrite, external_storage_location, StateFileType, @@ -19,20 +19,25 @@ use near_client::sync::external::{ }; use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::EpochManagerAdapter; +use near_primitives::block::BlockHeader; use near_primitives::hash::CryptoHash; use near_primitives::state_part::PartId; use near_primitives::state_sync::StateSyncDumpProgress; -use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot}; -use near_primitives::version::PROTOCOL_VERSION; -use rand::{thread_rng, Rng}; -use std::collections::HashSet; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; +use near_primitives::types::{EpochHeight, EpochId, ShardId, StateRoot}; +use rand::seq::SliceRandom; +use rand::thread_rng; +use std::collections::{HashMap, HashSet}; +use std::i64; +use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; +use std::sync::{Arc, RwLock}; +use tokio::sync::oneshot; +use tokio::sync::Semaphore; /// Time limit per state dump iteration. /// A node must check external storage for parts to dump again once time is up. pub const STATE_DUMP_ITERATION_TIME_LIMIT_SECS: u64 = 300; +// TODO: could refactor this further and just have one "Dumper" struct here pub struct StateSyncDumper { pub clock: Clock, pub client_config: ClientConfig, @@ -45,41 +50,13 @@ pub struct StateSyncDumper { /// Please note that the locked value should not be stored anywhere or passed through the thread boundary. pub validator: MutableValidatorSigner, pub dump_future_runner: Box) -> Box>, + pub future_spawner: Arc, pub handle: Option, } impl StateSyncDumper { - /// Returns all current ShardIDs, plus any that may belong to a future epoch after a protocol upgrade - /// For now we start a thread for each shard ID even if it won't be needed for a long time. - /// TODO(resharding): fix that, and handle the dynamic resharding case. - fn get_all_shard_ids(&self) -> anyhow::Result> { - let chain = Chain::new_for_view_client( - self.clock.clone(), - self.epoch_manager.clone(), - self.shard_tracker.clone(), - self.runtime.clone(), - &self.chain_genesis, - DoomslugThresholdMode::TwoThirds, - false, - ) - .context("failed creating Chain")?; - let epoch_id = chain.head().context("failed getting chain head")?.epoch_id; - let head_protocol_version = self - .epoch_manager - .get_epoch_protocol_version(&epoch_id) - .context("failed getting epoch protocol version")?; - - let mut shard_ids = HashSet::new(); - for protocol_version in head_protocol_version..=PROTOCOL_VERSION { - let shard_layout = - self.epoch_manager.get_shard_layout_from_protocol_version(protocol_version); - shard_ids.extend(shard_layout.shard_ids()); - } - Ok(shard_ids) - } - - /// Starts one a thread per tracked shard. - /// Each started thread will be dumping state parts of a single epoch to external storage. + /// Starts a thread that periodically checks whether any new parts need to be uploaded, and then spawns + /// futures to generate and upload them pub fn start(&mut self) -> anyhow::Result<()> { assert!(self.handle.is_none(), "StateSyncDumper already started"); @@ -115,49 +92,43 @@ impl StateSyncDumper { }, }; - // Determine how many threads to start. - let shard_ids = self.get_all_shard_ids()?; - let chain_id = self.client_config.chain_id.clone(); let keep_running = Arc::new(AtomicBool::new(true)); - // Start a thread for each shard. - let handles = shard_ids - .into_iter() - .map(|shard_id| { - let runtime = self.runtime.clone(); - let chain_genesis = self.chain_genesis.clone(); - // Sadly, `Chain` is not `Send` and each thread needs to create its own `Chain` instance. - let chain = Chain::new_for_view_client( - self.clock.clone(), - self.epoch_manager.clone(), - self.shard_tracker.clone(), - runtime.clone(), - &chain_genesis, - DoomslugThresholdMode::TwoThirds, - false, - ) - .unwrap(); - (self.dump_future_runner)( - state_sync_dump( - self.clock.clone(), - shard_id, - chain, - self.epoch_manager.clone(), - self.shard_tracker.clone(), - runtime.clone(), - chain_id.clone(), - dump_config.restart_dump_for_shards.clone().unwrap_or_default(), - external.clone(), - dump_config.iteration_delay.unwrap_or(Duration::seconds(10)), - self.validator.clone(), - keep_running.clone(), - ) - .boxed(), - ) - }) - .collect(); - self.handle = Some(StateSyncDumpHandle { handles, keep_running }); + let chain = Chain::new_for_view_client( + self.clock.clone(), + self.epoch_manager.clone(), + self.shard_tracker.clone(), + self.runtime.clone(), + &self.chain_genesis, + DoomslugThresholdMode::TwoThirds, + false, + ) + .unwrap(); + if let Some(shards) = dump_config.restart_dump_for_shards.as_ref() { + for shard_id in shards { + chain.chain_store().set_state_sync_dump_progress(*shard_id, None).unwrap(); + tracing::debug!(target: "state_sync_dump", ?shard_id, "Dropped existing progress"); + } + } + let handle = (self.dump_future_runner)( + do_state_sync_dump( + self.clock.clone(), + chain, + self.epoch_manager.clone(), + self.shard_tracker.clone(), + self.runtime.clone(), + chain_id, + external, + dump_config.iteration_delay.unwrap_or(Duration::seconds(10)), + self.validator.clone(), + keep_running.clone(), + self.future_spawner.clone(), + ) + .boxed(), + ); + + self.handle = Some(StateSyncDumpHandle { handle: Some(handle), keep_running }); Ok(()) } @@ -179,7 +150,7 @@ impl StateSyncDumper { /// Holds arbiter handles controlling the lifetime of the spawned threads. pub struct StateSyncDumpHandle { - pub handles: Vec>, + pub handle: Option>, keep_running: Arc, } @@ -192,25 +163,11 @@ impl Drop for StateSyncDumpHandle { impl StateSyncDumpHandle { fn stop(&mut self) { tracing::warn!(target: "state_sync_dump", "Stopping state dumper"); - self.keep_running.store(false, std::sync::atomic::Ordering::Relaxed); - self.handles.drain(..).for_each(|dropper| { - dropper(); - }); + self.keep_running.store(false, Ordering::Relaxed); + self.handle.take().unwrap()() } } -/// Fetches the state sync header from DB and serializes it. -fn get_serialized_header( - shard_id: ShardId, - sync_hash: CryptoHash, - chain: &Chain, -) -> anyhow::Result> { - let header = chain.get_state_response_header(shard_id, sync_hash)?; - let mut buffer: Vec = Vec::new(); - header.serialize(&mut buffer)?; - Ok(buffer) -} - pub fn extract_part_id_from_part_file_name(file_name: &String) -> u64 { assert!(is_part_filename(file_name)); return get_part_id_from_filename(file_name).unwrap(); @@ -218,12 +175,15 @@ pub fn extract_part_id_from_part_file_name(file_name: &String) -> u64 { async fn get_missing_part_ids_for_epoch( shard_id: ShardId, - chain_id: &String, + chain_id: &str, epoch_id: &EpochId, epoch_height: u64, total_parts: u64, external: &ExternalConnection, -) -> Result, anyhow::Error> { +) -> Result, anyhow::Error> { + if total_parts == 0 { + return Ok(HashSet::new()); + } let directory_path = external_storage_location_directory( chain_id, epoch_id, @@ -237,432 +197,863 @@ async fn get_missing_part_ids_for_epoch( .iter() .map(|file_name| extract_part_id_from_part_file_name(file_name)) .collect(); - let missing_nums: Vec = + let missing_nums: HashSet<_> = (0..total_parts).filter(|i| !existing_nums.contains(i)).collect(); let num_missing = missing_nums.len(); tracing::debug!(target: "state_sync_dump", ?num_missing, ?directory_path, "Some parts have already been dumped."); Ok(missing_nums) } else { tracing::debug!(target: "state_sync_dump", ?total_parts, ?directory_path, "No part has been dumped."); - let missing_nums = (0..total_parts).collect::>(); + let missing_nums = (0..total_parts).collect(); Ok(missing_nums) } } -fn select_random_part_id_with_index(parts_to_be_dumped: &Vec) -> (u64, usize) { - let mut rng = thread_rng(); - let selected_idx = rng.gen_range(0..parts_to_be_dumped.len()); - let selected_element = parts_to_be_dumped[selected_idx]; - tracing::debug!(target: "state_sync_dump", ?selected_element, "selected parts to dump: "); - (selected_element, selected_idx) +// State associated with dumping a shard's state +struct ShardDump { + state_root: StateRoot, + // None if it's already been dumped + header_to_dump: Option>, + num_parts: u64, + parts_dumped: Arc, + // This is the set of parts who have an associated file stored in the ExternalConnection, + // meaning they've already been dumped. We periodically check this (since other processes/machines + // might have uploaded parts that we didn't) and avoid duplicating work for those parts that have already been updated. + parts_missing: Arc>>, + // This will give Ok(()) when they're all done, or Err() when one gives an error + // For now the tasks never fail, since we just retry all errors like the old implementation did, + // but we probably want to make a change to distinguish which errors are actually retriable + // (e.g. the state snapshot isn't ready yet) + upload_parts: oneshot::Receiver>, } -enum StateDumpAction { - Wait, - Dump { epoch_id: EpochId, epoch_height: EpochHeight, sync_hash: CryptoHash }, +// State associated with dumping an epoch's state +struct DumpState { + epoch_id: EpochId, + epoch_height: EpochHeight, + sync_prev_prev_hash: CryptoHash, + // Contains state for each shard we need to dump. We remove shard IDs from + // this map as we finish them. + dump_state: HashMap, + canceled: Arc, } -fn get_current_state( - chain: &Chain, - shard_id: &ShardId, - shard_tracker: &ShardTracker, - account_id: &Option, - epoch_manager: Arc, -) -> Result { - let was_last_epoch_done = match chain.chain_store().get_state_sync_dump_progress(*shard_id) { - Ok(StateSyncDumpProgress::AllDumped { epoch_id, .. }) => Some(epoch_id), - Ok(StateSyncDumpProgress::Skipped { epoch_id, .. }) => Some(epoch_id), - _ => None, - }; - - let maybe_latest_epoch_info = get_latest_epoch(shard_id, &chain, epoch_manager.clone()); - let latest_epoch_info = match maybe_latest_epoch_info { - Ok(latest_epoch_info) => latest_epoch_info, - Err(err) => { - tracing::error!(target: "state_sync_dump", ?shard_id, ?err, "Failed to get the latest epoch"); - return Err(err); +impl DumpState { + /// For each shard, checks the filenames that exist in `external` and sets the corresponding `parts_missing` fields + /// to contain the parts that haven't yet been uploaded, so that we only try to generate those. + async fn set_missing_parts(&self, external: &ExternalConnection, chain_id: &str) { + for (shard_id, s) in self.dump_state.iter() { + match get_missing_part_ids_for_epoch( + *shard_id, + chain_id, + &self.epoch_id, + self.epoch_height, + s.num_parts, + external, + ) + .await + { + Ok(missing) => { + *s.parts_missing.write().unwrap() = missing; + } + Err(error) => { + tracing::error!(target: "state_sync_dump", ?error, ?shard_id, "Failed to list stored state parts."); + } + } } - }; - let Some(LatestEpochInfo { - epoch_id: new_epoch_id, - epoch_height: new_epoch_height, - sync_hash: new_sync_hash, - }) = latest_epoch_info - else { - return Ok(StateDumpAction::Wait); - }; - - if Some(&new_epoch_id) == was_last_epoch_done.as_ref() { - return Ok(StateDumpAction::Wait); } +} - let shard_layout = epoch_manager.get_shard_layout(&new_epoch_id)?; +// Represents the state of the current epoch's state part dump +enum CurrentDump { + None, + InProgress(DumpState), + Done(EpochId), +} - if shard_layout.shard_ids().contains(shard_id) - && cares_about_shard(chain, shard_id, &new_sync_hash, &shard_tracker, &account_id)? - { - Ok(StateDumpAction::Dump { - epoch_id: new_epoch_id, - epoch_height: new_epoch_height, - sync_hash: new_sync_hash, - }) - } else { - Ok(StateDumpAction::Wait) - } +// Helper type used as an intermediate return value where the caller will want the sender only +// if there's something to do +enum NewDump { + Dump(DumpState, HashMap>>), + NoTrackedShards, } -/// Uploads header to external storage. -/// Returns true if it was successful. -async fn upload_state_header( - chain_id: &String, - epoch_id: &EpochId, - epoch_height: u64, +/// State associated with dumps for all shards responsible for checking when we need to dump for a new epoch +/// The basic flow is as follows: +/// +/// At startup or when we enter a new epoch, we initialize the `current_dump` field to represent the current epoch's state dump. +/// Then for each shard that we track and want to dump state for, we'll have one `ShardDump` struct representing it stored in the +/// `DumpState` struct that holds the global state. First we upload headers if they're not already present in the external storage, and +/// then we start the part uploading by calling `start_upload_parts()`. This initializes one `PartUploader` struct for each shard_id and part_id, +/// and spawns a PartUploader::upload_state_part() future for each, that will be responsible for generating and uploading that part if it's not +/// already uploaded. When all the parts for a shard have been uploaded, we'll be notified by the `upload_parts` field of the associated +/// `ShardDump` struct, which we check in `check_parts_upload()`. +/// +/// Separately, every so often we check whether there's a new epoch to dump state for (in `check_head()`) and whether other processes +/// have uploaded some state parts that we can therfore skip (in `check_stored_parts()`). +struct StateDumper { + clock: Clock, + chain_id: String, + validator: MutableValidatorSigner, + shard_tracker: ShardTracker, + chain: Chain, + epoch_manager: Arc, + runtime: Arc, + // State associated with dumping the current epoch + current_dump: CurrentDump, + external: ExternalConnection, + future_spawner: Arc, + // Used to limit how many tasks can be doing the computation-heavy state part generation at a time + obtain_parts: Arc, +} + +// Stores needed data for use in part upload futures +struct PartUploader { + clock: Clock, + external: ExternalConnection, + runtime: Arc, + chain_id: String, + epoch_id: EpochId, + epoch_height: EpochHeight, + sync_prev_prev_hash: CryptoHash, shard_id: ShardId, - state_sync_header: anyhow::Result>, - external: &ExternalConnection, -) -> bool { - match state_sync_header { - Err(err) => { - tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to serialize header."); - false + state_root: StateRoot, + num_parts: u64, + // Used for setting the num_parts_dumped gauge metric (which is an i64) + // When part upload tasks are cancelled on a new epoch, this is set to -1 so tasks + // know not to touch that metric anymore. + parts_dumped: Arc, + parts_missing: Arc>>, + obtain_parts: Arc, + canceled: Arc, +} + +impl PartUploader { + /// Increment the STATE_SYNC_DUMP_NUM_PARTS_DUMPED metric + fn inc_parts_dumped(&self) { + match self.parts_dumped.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { + if prev >= 0 { + Some(prev + 1) + } else { + None + } + }) { + Ok(prev_parts_dumped) => { + metrics::STATE_SYNC_DUMP_NUM_PARTS_DUMPED + .with_label_values(&[&self.shard_id.to_string()]) + .set(prev_parts_dumped + 1); + } + Err(_) => {} + }; + } + + /// Attempt to generate the state part for `self.epoch_id`, `self.shard_id` and `part_idx`, and upload it to + /// the external storage. The state part generation is limited by the number of permits allocated to the `obtain_parts` + /// Semaphore. For now, this always returns OK(()) (loops forever retrying in case of errors), but this should be changed + /// to return Err() if the error is not going to be retriable. + async fn upload_state_part(self: Arc, part_idx: u64) -> anyhow::Result<()> { + if !self.parts_missing.read().unwrap().contains(&part_idx) { + self.inc_parts_dumped(); + return Ok(()); } - Ok(header) => { - let file_type = StateFileType::StateHeader; - let location = - external_storage_location(&chain_id, &epoch_id, epoch_height, shard_id, &file_type); - match external.put_file(file_type, &header, shard_id, &location).await { - Err(err) => { - tracing::warn!(target: "state_sync_dump", ?shard_id, epoch_height, ?err, "Failed to put header into external storage. Will retry next iteration."); - false + let part_id = PartId::new(part_idx, self.num_parts); + + let state_part = loop { + if self.canceled.load(Ordering::Relaxed) { + return Ok(()); + } + let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED + .with_label_values(&[&self.shard_id.to_string()]) + .start_timer(); + let state_part = { + let _permit = self.obtain_parts.acquire().await.unwrap(); + self.runtime.obtain_state_part( + self.shard_id, + &self.sync_prev_prev_hash, + &self.state_root, + part_id, + ) + }; + match state_part { + Ok(state_part) => { + break state_part; } - Ok(_) => { - tracing::trace!(target: "state_sync_dump", ?shard_id, epoch_height, "Header saved to external storage."); - true + Err(error) => { + // TODO: return non retriable errors. + tracing::warn!( + target: "state_sync_dump", + shard_id = %self.shard_id, epoch_height=%self.epoch_height, epoch_id=?&self.epoch_id, ?part_id, ?error, + "Failed to obtain state part. Retrying in 200 millis." + ); + self.clock.sleep(Duration::milliseconds(200)).await; + continue; + } + } + }; + + let file_type = StateFileType::StatePart { part_id: part_idx, num_parts: self.num_parts }; + let location = external_storage_location( + &self.chain_id, + &self.epoch_id, + self.epoch_height, + self.shard_id, + &file_type, + ); + loop { + if self.canceled.load(Ordering::Relaxed) { + return Ok(()); + } + match self + .external + .put_file(file_type.clone(), &state_part, self.shard_id, &location) + .await + { + Ok(()) => { + self.inc_parts_dumped(); + metrics::STATE_SYNC_DUMP_SIZE_TOTAL + .with_label_values(&[ + &self.epoch_height.to_string(), + &self.shard_id.to_string(), + ]) + .inc_by(state_part.len() as u64); + tracing::debug!(target: "state_sync_dump", shard_id = %self.shard_id, epoch_height=%self.epoch_height, epoch_id=?&self.epoch_id, ?part_id, "Uploaded state part."); + return Ok(()); + } + Err(error) => { + tracing::warn!( + target: "state_sync_dump", shard_id = %self.shard_id, epoch_height=%self.epoch_height, epoch_id=?&self.epoch_id, ?part_id, ?error, + "Failed to upload state part. Retrying in 200 millis." + ); + self.clock.sleep(Duration::milliseconds(200)).await; + continue; } } } } } -const FAILURES_ALLOWED_PER_ITERATION: u32 = 10; - -async fn state_sync_dump( +// Stores needed data for use in header upload futures +struct HeaderUploader { clock: Clock, - shard_id: ShardId, - chain: Chain, - epoch_manager: Arc, - shard_tracker: ShardTracker, - runtime: Arc, - chain_id: String, - restart_dump_for_shards: Vec, external: ExternalConnection, - iteration_delay: Duration, - validator: MutableValidatorSigner, - keep_running: Arc, -) { - tracing::info!(target: "state_sync_dump", ?shard_id, "Running StateSyncDump loop"); + chain_id: String, + epoch_id: EpochId, + epoch_height: EpochHeight, +} - if restart_dump_for_shards.contains(&shard_id) { - tracing::debug!(target: "state_sync_dump", ?shard_id, "Dropped existing progress"); - chain.chain_store().set_state_sync_dump_progress(shard_id, None).unwrap(); +impl HeaderUploader { + /// Attempt to generate the state header for `self.epoch_id` and `self.shard_id`, and upload it to + /// the external storage. For now, this always returns OK(()) (loops forever retrying in case of errors), + /// but this should be changed to return Err() if the error is not going to be retriable. + async fn upload_header(self: Arc, shard_id: ShardId, header: Option>) { + let Some(header) = header else { + return; + }; + let file_type = StateFileType::StateHeader; + let location = external_storage_location( + &self.chain_id, + &self.epoch_id, + self.epoch_height, + shard_id, + &file_type, + ); + loop { + match self.external.put_file(file_type.clone(), &header, shard_id, &location).await { + Ok(_) => { + tracing::info!( + target: "state_sync_dump", %shard_id, epoch_height = %self.epoch_height, + "Header saved to external storage." + ); + return; + } + Err(err) => { + tracing::warn!( + target: "state_sync_dump", %shard_id, epoch_height = %self.epoch_height, ?err, + "Failed to put header into external storage. Will retry next iteration." + ); + self.clock.sleep(Duration::seconds(5)).await; + continue; + } + }; + } } - // Stop if the node is stopped. - // Note that without this check the state dumping thread is unstoppable, i.e. non-interruptable. - while keep_running.load(std::sync::atomic::Ordering::Relaxed) { - tracing::debug!(target: "state_sync_dump", ?shard_id, "Running StateSyncDump loop iteration"); - let account_id = validator.get().map(|v| v.validator_id().clone()); - let current_state = get_current_state( - &chain, - &shard_id, - &shard_tracker, - &account_id, - epoch_manager.clone(), - ); - let next_state = match current_state { + /// Returns whether the state sync header for `self.epoch_id` and `self.shard_id` is already uploaded to the + /// external storage + async fn header_stored(self: Arc, shard_id: ShardId) -> bool { + match self + .external + .is_state_sync_header_stored_for_epoch( + shard_id, + &self.chain_id, + &self.epoch_id, + self.epoch_height, + ) + .await + { + Ok(stored) => stored, Err(err) => { - tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to get the current state"); - None + tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to determine header presence in external storage."); + false } - Ok(StateDumpAction::Wait) => None, - Ok(StateDumpAction::Dump { epoch_id, epoch_height, sync_hash }) => { - let in_progress_data = get_in_progress_data(shard_id, sync_hash, &chain); - match in_progress_data { - Err(err) => { - tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to get in progress data"); - None - } - Ok((state_root, num_parts, sync_prev_prev_hash)) => { - // Upload header - let header_in_external_storage = match external - .is_state_sync_header_stored_for_epoch( - shard_id, - &chain_id, - &epoch_id, - epoch_height, - ) - .await - { - Err(err) => { - tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to determine header presence in external storage."); - false - } - // Header is already stored - Ok(true) => true, - // Header is missing - Ok(false) => { - upload_state_header( - &chain_id, - &epoch_id, - epoch_height, - shard_id, - get_serialized_header(shard_id, sync_hash, &chain), - &external, - ) - .await - } - }; - - let header_upload_status = if header_in_external_storage { - None - } else { - Some(StateSyncDumpProgress::InProgress { - epoch_id: epoch_id, - epoch_height, - sync_hash, - }) - }; - - // Upload parts - let parts_upload_status = match get_missing_part_ids_for_epoch( - shard_id, - &chain_id, - &epoch_id, - epoch_height, - num_parts, - &external, - ) - .await - { - Err(err) => { - tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to determine missing parts"); - None - } - Ok(missing_parts) if missing_parts.is_empty() => { - update_dumped_size_and_cnt_metrics( - &shard_id, - epoch_height, - None, - num_parts, - num_parts, - ); - Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height }) - } - Ok(missing_parts) => { - let mut parts_to_dump = missing_parts.clone(); - let timer = Instant::now(); - let mut dumped_any_state_part = false; - let mut failures_cnt = 0; - // Stop if the node is stopped. - // Note that without this check the state dumping thread is unstoppable, i.e. non-interruptable. - while keep_running.load(std::sync::atomic::Ordering::Relaxed) - && timer.elapsed().as_secs() - <= STATE_DUMP_ITERATION_TIME_LIMIT_SECS - && !parts_to_dump.is_empty() - && failures_cnt < FAILURES_ALLOWED_PER_ITERATION - { - let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - - let (part_id, selected_idx) = - select_random_part_id_with_index(&parts_to_dump); - - let state_part = runtime.obtain_state_part( - shard_id, - &sync_prev_prev_hash, - &state_root, - PartId::new(part_id, num_parts), - ); - let state_part = match state_part { - Ok(state_part) => state_part, - Err(err) => { - tracing::warn!(target: "state_sync_dump", ?shard_id, epoch_height, part_id, ?err, "Failed to obtain and store part. Will skip this part."); - failures_cnt += 1; - continue; - } - }; - - let file_type = StateFileType::StatePart { part_id, num_parts }; - let location = external_storage_location( - &chain_id, - &epoch_id, - epoch_height, - shard_id, - &file_type, - ); - if let Err(err) = external - .put_file(file_type, &state_part, shard_id, &location) - .await - { - // no need to break if there's an error, we should keep dumping other parts. - // reason is we are dumping random selected parts, so it's fine if we are not able to finish all of them - tracing::warn!(target: "state_sync_dump", ?shard_id, epoch_height, part_id, ?err, "Failed to put a store part into external storage. Will skip this part."); - failures_cnt += 1; - continue; - } - - // Remove the dumped part from parts_to_dump so that we draw without replacement. - parts_to_dump.swap_remove(selected_idx); - update_dumped_size_and_cnt_metrics( - &shard_id, - epoch_height, - Some(state_part.len()), - num_parts.checked_sub(parts_to_dump.len() as u64).unwrap(), - num_parts, - ); - dumped_any_state_part = true; - } - if parts_to_dump.is_empty() { - Some(StateSyncDumpProgress::AllDumped { - epoch_id, - epoch_height, - }) - } else if dumped_any_state_part { - Some(StateSyncDumpProgress::InProgress { - epoch_id, - epoch_height, - sync_hash, - }) - } else { - // No progress made. Wait before retrying. - None - } - } - }; - match (&parts_upload_status, &header_upload_status) { - ( - Some(StateSyncDumpProgress::AllDumped { .. }), - Some(StateSyncDumpProgress::InProgress { .. }), - ) => header_upload_status, - _ => parts_upload_status, - } - } - } + } + } +} + +impl StateDumper { + fn new( + clock: Clock, + chain_id: String, + validator: MutableValidatorSigner, + shard_tracker: ShardTracker, + chain: Chain, + epoch_manager: Arc, + runtime: Arc, + external: ExternalConnection, + future_spawner: Arc, + ) -> Self { + Self { + clock, + chain_id, + validator, + shard_tracker, + chain, + epoch_manager, + runtime, + current_dump: CurrentDump::None, + external, + future_spawner, + obtain_parts: Arc::new(Semaphore::new(4)), + } + } + + fn get_block_header(&self, hash: &CryptoHash) -> anyhow::Result { + self.chain.get_block_header(hash).with_context(|| format!("Failed getting header {}", hash)) + } + + /// Reads the DB entries starting with `STATE_SYNC_DUMP_KEY`, and checks which ShardIds and EpochIds are indicated as + /// already having been fully dumped. For each shard ID whose state for `epoch_id` has already been dumped, we remove it + /// from `dump` and `senders` so that we don't start the state dump logic for it. + fn check_old_progress( + &mut self, + epoch_id: &EpochId, + dump: &mut DumpState, + senders: &mut HashMap>>, + ) -> anyhow::Result<()> { + for res in self.chain.chain_store().iter_state_sync_dump_progress() { + let (shard_id, (dumped_epoch_id, done)) = + res.context("failed iterating over stored dump progress")?; + if &dumped_epoch_id != epoch_id { + self.chain + .chain_store() + .set_state_sync_dump_progress(shard_id, None) + .context("failed setting state dump progress")?; + } else if done { + dump.dump_state.remove(&shard_id); + senders.remove(&shard_id); } + } + Ok(()) + } + + /// Returns the `sync_hash` header corresponding to the latest final block if it's already known. + fn latest_sync_header(&self) -> anyhow::Result> { + let head = self.chain.head().context("Failed getting chain head")?; + let header = self.get_block_header(&head.last_block_hash)?; + let final_hash = header.last_final_block(); + if final_hash == &CryptoHash::default() { + return Ok(None); + } + let Some(sync_hash) = self + .chain + .get_sync_hash(final_hash) + .with_context(|| format!("Failed getting sync hash for {}", &final_hash))? + else { + return Ok(None); }; + self.get_block_header(&sync_hash).map(Some) + } - // Record the next state of the state machine. - let has_progress = match next_state { - Some(next_state) => { - tracing::debug!(target: "state_sync_dump", ?shard_id, ?next_state); - match chain.chain_store().set_state_sync_dump_progress(shard_id, Some(next_state)) { - Ok(_) => true, - Err(err) => { - // This will be retried. - tracing::debug!(target: "state_sync_dump", ?shard_id, ?err, "Failed to set progress"); - false - } - } + /// Generates the state sync header for the shard and initializes the `ShardDump` struct which + /// will be used to keep track of what's been dumped so far for this shard. + fn get_shard_dump( + &self, + shard_id: ShardId, + sync_hash: &CryptoHash, + ) -> anyhow::Result<(ShardDump, oneshot::Sender>)> { + let state_header = + self.chain.get_state_response_header(shard_id, *sync_hash).with_context(|| { + format!("Failed getting state response header for {} {}", shard_id, sync_hash) + })?; + let state_root = state_header.chunk_prev_state_root(); + let num_parts = state_header.num_state_parts(); + metrics::STATE_SYNC_DUMP_NUM_PARTS_TOTAL + .with_label_values(&[&shard_id.to_string()]) + .set(num_parts.try_into().unwrap_or(i64::MAX)); + + let mut header_bytes: Vec = Vec::new(); + state_header.serialize(&mut header_bytes)?; + let (sender, receiver) = oneshot::channel(); + Ok(( + ShardDump { + state_root, + header_to_dump: Some(header_bytes), + num_parts, + parts_dumped: Arc::new(AtomicI64::new(0)), + parts_missing: Arc::new(RwLock::new((0..num_parts).collect())), + upload_parts: receiver, + }, + sender, + )) + } + + /// Initializes a `NewDump` struct, which is a helper return value that either returns `NoTrackedShards` + /// if we're not tracking anything, or a `DumpState` struct, which holds one `ShardDump` initialized by `get_shard_dump()` + /// for each shard that we track. This, and the associated oneshot::Senders will then hold all the state related to the + /// progress of dumping the current epoch's state. This is to be called at startup and also upon each new epoch. + fn get_dump_state(&mut self, sync_header: &BlockHeader) -> anyhow::Result { + let epoch_info = self + .epoch_manager + .get_epoch_info(sync_header.epoch_id()) + .with_context(|| format!("Failed getting epoch info {:?}", sync_header.epoch_id()))?; + let sync_prev_header = self.get_block_header(sync_header.prev_hash())?; + let sync_prev_prev_hash = *sync_prev_header.prev_hash(); + let shard_ids = self + .epoch_manager + .shard_ids(sync_header.epoch_id()) + .with_context(|| format!("Failed getting shard IDs {:?}", sync_header.epoch_id()))?; + + let v = self.validator.get(); + let account_id = v.as_ref().map(|v| v.validator_id()); + let mut dump_state = HashMap::new(); + let mut senders = HashMap::new(); + for shard_id in shard_ids { + if !self.shard_tracker.care_about_shard( + account_id, + sync_header.prev_hash(), + shard_id, + true, + ) { + tracing::debug!( + target: "state_sync_dump", epoch_height = %epoch_info.epoch_height(), epoch_id = ?sync_header.epoch_id(), %shard_id, + "Not dumping state for non-tracked shard." + ); + continue; } - None => { - // Nothing to do, will check again later. - tracing::debug!(target: "state_sync_dump", ?shard_id, "Idle"); - false + metrics::STATE_SYNC_DUMP_EPOCH_HEIGHT + .with_label_values(&[&shard_id.to_string()]) + .set(epoch_info.epoch_height().try_into().unwrap_or(i64::MAX)); + + let (shard_dump, sender) = self.get_shard_dump(shard_id, sync_header.hash())?; + dump_state.insert(shard_id, shard_dump); + senders.insert(shard_id, sender); + } + assert_eq!( + dump_state.keys().collect::>(), + senders.keys().collect::>() + ); + if dump_state.is_empty() { + tracing::warn!( + target: "state_sync_dump", epoch_height = %epoch_info.epoch_height(), epoch_id = ?sync_header.epoch_id(), + "Not doing anything for the current epoch. No shards tracked." + ); + return Ok(NewDump::NoTrackedShards); + } + Ok(NewDump::Dump( + DumpState { + epoch_id: *sync_header.epoch_id(), + epoch_height: epoch_info.epoch_height(), + sync_prev_prev_hash, + dump_state, + canceled: Arc::new(AtomicBool::new(false)), + }, + senders, + )) + } + + /// For each shard we're dumping state for, check whether the state sync header is already stored in the external storage, + /// and set `header_to_dump` to None if so, so we don't waste time uploading it again. + async fn check_stored_headers(&mut self, dump: &mut DumpState) -> anyhow::Result<()> { + let uploader = Arc::new(HeaderUploader { + clock: self.clock.clone(), + external: self.external.clone(), + chain_id: self.chain_id.clone(), + epoch_id: dump.epoch_id, + epoch_height: dump.epoch_height, + }); + let shards = dump + .dump_state + .iter() + .map(|(shard_id, _)| (uploader.clone(), *shard_id)) + .collect::>(); + let headers_stored = tokio_stream::iter(shards) + .filter_map(|(uploader, shard_id)| async move { + let stored = uploader.header_stored(shard_id).await; + if stored { + Some(futures::future::ready(shard_id)) + } else { + None + } + }) + .buffer_unordered(10) + .collect::>() + .await; + for shard_id in headers_stored { + tracing::info!( + target: "state_sync_dump", %shard_id, epoch_height = %dump.epoch_height, + "Header already saved to external storage." + ); + let s = dump.dump_state.get_mut(&shard_id).unwrap(); + s.header_to_dump = None; + } + Ok(()) + } + + /// try to upload the state sync header for each shard we're dumping state for + async fn store_headers(&mut self, dump: &mut DumpState) -> anyhow::Result<()> { + let uploader = Arc::new(HeaderUploader { + clock: self.clock.clone(), + external: self.external.clone(), + chain_id: self.chain_id.clone(), + epoch_id: dump.epoch_id, + epoch_height: dump.epoch_height, + }); + let headers = dump + .dump_state + .iter_mut() + .map(|(shard_id, shard_dump)| { + (uploader.clone(), *shard_id, shard_dump.header_to_dump.take()) + }) + .collect::>(); + + tokio_stream::iter(headers) + .map(|(uploader, shard_id, header)| async move { + uploader.upload_header(shard_id, header).await + }) + .buffer_unordered(10) + .collect::<()>() + .await; + + Ok(()) + } + + /// Start uploading state parts. For each shard we're dumping state for and each state part in that shard, this + /// starts one PartUploader::upload_state_part() future. It also starts one future that will examine the results + /// of those futures as they finish, and that will send on `senders` either the first error that occurs or Ok(()) + /// when all parts have been uploaded for the shard. + async fn start_upload_parts( + &mut self, + senders: HashMap>>, + dump: &DumpState, + ) { + let mut senders = senders + .into_iter() + .map(|(shard_id, sender)| { + let d = dump.dump_state.get(&shard_id).unwrap(); + (shard_id, (sender, d.num_parts)) + }) + .collect::>(); + let mut empty_shards = HashSet::new(); + let uploaders = dump + .dump_state + .iter() + .filter_map(|(shard_id, shard_dump)| { + metrics::STATE_SYNC_DUMP_NUM_PARTS_DUMPED + .with_label_values(&[&shard_id.to_string()]) + .set(0); + if shard_dump.num_parts > 0 { + Some(Arc::new(PartUploader { + clock: self.clock.clone(), + external: self.external.clone(), + runtime: self.runtime.clone(), + chain_id: self.chain_id.clone(), + epoch_id: dump.epoch_id, + epoch_height: dump.epoch_height, + sync_prev_prev_hash: dump.sync_prev_prev_hash, + shard_id: *shard_id, + state_root: shard_dump.state_root, + num_parts: shard_dump.num_parts, + parts_dumped: shard_dump.parts_dumped.clone(), + parts_missing: shard_dump.parts_missing.clone(), + obtain_parts: self.obtain_parts.clone(), + canceled: dump.canceled.clone(), + })) + } else { + empty_shards.insert(shard_id); + None + } + }) + .collect::>(); + for shard_id in empty_shards { + let (sender, _) = senders.remove(shard_id).unwrap(); + let _ = sender.send(Ok(())); + } + assert_eq!(senders.len(), uploaders.len()); + + let mut tasks = uploaders + .iter() + .map(|u| (0..u.num_parts).map(|part_id| (u.clone(), part_id))) + .flatten() + .collect::>(); + // We randomize so different nodes uploading parts don't try to upload in the same order + tasks.shuffle(&mut thread_rng()); + + let future_spawner = self.future_spawner.clone(); + let fut = async move { + let mut tasks = tokio_stream::iter(tasks) + .map(|(u, part_id)| { + let shard_id = u.shard_id; + let task = u.upload_state_part(part_id); + let task = respawn_for_parallelism(&*future_spawner, "upload part", task); + async move { (shard_id, task.await) } + }) + .buffer_unordered(10); + + while let Some((shard_id, result)) = tasks.next().await { + let std::collections::hash_map::Entry::Occupied(mut e) = senders.entry(shard_id) + else { + panic!("shard ID {} missing in state dump handles", shard_id); + }; + let (_, parts_left) = e.get_mut(); + if result.is_err() { + let (sender, _) = e.remove(); + let _ = sender.send(result); + return; + } + *parts_left -= 1; + if *parts_left == 0 { + let (sender, _) = e.remove(); + let _ = sender.send(result); + } } }; + self.future_spawner.spawn_boxed("upload_parts", fut.boxed()); + } - if !has_progress { - // Avoid a busy-loop when there is nothing to do. - clock.sleep(iteration_delay).await; + /// Sets the in-memory and on-disk state to reflect that we're currently dumping state for a new epoch, + /// with the info and progress represented in `dump`. + fn new_dump(&mut self, dump: DumpState, sync_hash: CryptoHash) -> anyhow::Result<()> { + for (shard_id, _) in dump.dump_state.iter() { + self.chain + .chain_store() + .set_state_sync_dump_progress( + *shard_id, + Some(StateSyncDumpProgress::InProgress { + epoch_id: dump.epoch_id, + epoch_height: dump.epoch_height, + sync_hash, + }), + ) + .context("failed setting state dump progress")?; } + self.current_dump = CurrentDump::InProgress(dump); + Ok(()) } - tracing::debug!(target: "state_sync_dump", ?shard_id, "Stopped state dump thread"); -} -// Extracts extra data needed for obtaining state parts. -fn get_in_progress_data( - shard_id: ShardId, - sync_hash: CryptoHash, - chain: &Chain, -) -> Result<(StateRoot, u64, CryptoHash), Error> { - let state_header = chain.get_state_response_header(shard_id, sync_hash)?; - let state_root = state_header.chunk_prev_state_root(); - let num_parts = state_header.num_state_parts(); - - let sync_block_header = chain.get_block_header(&sync_hash)?; - let sync_prev_block_header = chain.get_previous_header(&sync_block_header)?; - let sync_prev_prev_hash = sync_prev_block_header.prev_hash(); - Ok((state_root, num_parts, *sync_prev_prev_hash)) -} + // Checks the current epoch and initializes the types associated with dumping its state + // if it hasn't already been dumped. + async fn init(&mut self, iteration_delay: Duration) -> anyhow::Result<()> { + loop { + let Some(sync_header) = self.latest_sync_header()? else { + self.clock.sleep(iteration_delay).await; + continue; + }; + match self.get_dump_state(&sync_header)? { + NewDump::Dump(mut dump, mut senders) => { + self.check_old_progress(sync_header.epoch_id(), &mut dump, &mut senders)?; + if dump.dump_state.is_empty() { + self.current_dump = CurrentDump::Done(*sync_header.epoch_id()); + return Ok(()); + } -fn update_dumped_size_and_cnt_metrics( - shard_id: &ShardId, - epoch_height: EpochHeight, - part_len: Option, - parts_dumped: u64, - num_parts: u64, -) { - if let Some(part_len) = part_len { - metrics::STATE_SYNC_DUMP_SIZE_TOTAL - .with_label_values(&[&epoch_height.to_string(), &shard_id.to_string()]) - .inc_by(part_len as u64); + self.check_stored_headers(&mut dump).await?; + self.store_headers(&mut dump).await?; + + dump.set_missing_parts(&self.external, &self.chain_id).await; + self.start_upload_parts(senders, &dump).await; + self.new_dump(dump, *sync_header.hash())?; + } + NewDump::NoTrackedShards => { + self.current_dump = CurrentDump::Done(*sync_header.epoch_id()); + } + } + return Ok(()); + } } - metrics::STATE_SYNC_DUMP_EPOCH_HEIGHT - .with_label_values(&[&shard_id.to_string()]) - .set(epoch_height as i64); + // Returns when the part upload tasks are finished + async fn check_parts_upload(&mut self) -> anyhow::Result<()> { + let CurrentDump::InProgress(dump) = &mut self.current_dump else { + return std::future::pending().await; + }; + let ((shard_id, result), _, _still_going) = + futures::future::select_all(dump.dump_state.iter_mut().map(|(shard_id, s)| { + async { + let r = (&mut s.upload_parts).await.unwrap(); + (*shard_id, r) + } + .boxed() + })) + .await; + result?; + drop(_still_going); - metrics::STATE_SYNC_DUMP_NUM_PARTS_DUMPED - .with_label_values(&[&shard_id.to_string()]) - .set(parts_dumped as i64); + tracing::info!(target: "state_sync_dump", epoch_id = ?&dump.epoch_id, %shard_id, "Shard dump finished"); - metrics::STATE_SYNC_DUMP_NUM_PARTS_TOTAL - .with_label_values(&[&shard_id.to_string()]) - .set(num_parts as i64); -} + self.chain + .chain_store() + .set_state_sync_dump_progress( + shard_id, + Some(StateSyncDumpProgress::AllDumped { + epoch_id: dump.epoch_id, + epoch_height: dump.epoch_height, + }), + ) + .context("failed setting state dump progress")?; + dump.dump_state.remove(&shard_id); + if dump.dump_state.is_empty() { + self.current_dump = CurrentDump::Done(dump.epoch_id); + } + Ok(()) + } -fn cares_about_shard( - chain: &Chain, - shard_id: &ShardId, - sync_hash: &CryptoHash, - shard_tracker: &ShardTracker, - account_id: &Option, -) -> Result { - let sync_header = chain.get_block_header(&sync_hash)?; - let sync_prev_hash = sync_header.prev_hash(); - Ok(shard_tracker.care_about_shard(account_id.as_ref(), sync_prev_hash, *shard_id, true)) -} + // Checks which parts have already been uploaded possibly by other nodes + // We use &mut so the do_state_sync_dump() future will be Send, which it won't be if we use a normal + // reference because of the Chain field + async fn check_stored_parts(&mut self) { + let CurrentDump::InProgress(dump) = &self.current_dump else { + return; + }; + dump.set_missing_parts(&self.external, &self.chain_id).await; + } -struct LatestEpochInfo { - epoch_id: EpochId, - epoch_height: EpochHeight, - sync_hash: CryptoHash, + /// Check whether there's a new epoch to dump state for. In that case, we start dumping + /// state for the new epoch whether or not we've finished with the old one, since other nodes + /// will be interested in the latest state. + async fn check_head(&mut self) -> anyhow::Result<()> { + let Some(sync_header) = self.latest_sync_header()? else { + return Ok(()); + }; + match &self.current_dump { + CurrentDump::InProgress(dump) => { + if &dump.epoch_id == sync_header.epoch_id() { + return Ok(()); + } + dump.canceled.store(true, Ordering::Relaxed); + for (_shard_id, d) in dump.dump_state.iter() { + // Set it to -1 to tell the existing tasks not to set the metrics anymore + d.parts_dumped.store(-1, Ordering::SeqCst); + } + } + CurrentDump::Done(epoch_id) => { + if epoch_id == sync_header.epoch_id() { + return Ok(()); + } + } + CurrentDump::None => {} + }; + match self.get_dump_state(&sync_header)? { + NewDump::Dump(mut dump, sender) => { + self.store_headers(&mut dump).await?; + self.start_upload_parts(sender, &dump).await; + self.new_dump(dump, *sync_header.hash())?; + } + NewDump::NoTrackedShards => { + self.current_dump = CurrentDump::Done(*sync_header.epoch_id()); + } + }; + Ok(()) + } } -/// return epoch_id and sync_hash of the latest complete epoch available locally. -fn get_latest_epoch( - shard_id: &ShardId, - chain: &Chain, +const CHECK_STORED_PARTS_INTERVAL: Duration = Duration::seconds(20); + +/// Main entry point into the state dumper. Initializes the state dumper and starts a loop that periodically +/// checks whether there's a new epoch to dump state for. +async fn state_sync_dump( + clock: Clock, + chain: Chain, epoch_manager: Arc, -) -> Result, Error> { - let head = chain.head()?; - tracing::debug!(target: "state_sync_dump", ?shard_id, "Check if a new complete epoch is available"); - let hash = head.last_block_hash; - let header = chain.get_block_header(&hash)?; - let final_hash = header.last_final_block(); - if final_hash == &CryptoHash::default() { - return Ok(None); + shard_tracker: ShardTracker, + runtime: Arc, + chain_id: String, + external: ExternalConnection, + iteration_delay: Duration, + validator: MutableValidatorSigner, + keep_running: Arc, + future_spawner: Arc, +) -> anyhow::Result<()> { + tracing::info!(target: "state_sync_dump", "Running StateSyncDump loop"); + + let mut dumper = StateDumper::new( + clock.clone(), + chain_id, + validator, + shard_tracker, + chain, + epoch_manager, + runtime, + external, + future_spawner, + ); + dumper.init(iteration_delay).await?; + + let now = clock.now(); + // This is set to zero in some tests where the block production delay is very small (10 millis). + // In that case we'll actually just wait for 1 millisecond. The previous behavior was to call + // clock.sleep(ZERO), but setting it to 1 is probably fine, and works with the Instant below. + let min_iteration_delay = Duration::milliseconds(1); + let mut check_head = + Interval::new(now + iteration_delay, iteration_delay.max(min_iteration_delay)); + let mut check_stored_parts = + Interval::new(now + CHECK_STORED_PARTS_INTERVAL, CHECK_STORED_PARTS_INTERVAL); + + while keep_running.load(Ordering::Relaxed) { + tokio::select! { + _ = check_head.tick(&clock) => { + dumper.check_head().await?; + } + _ = check_stored_parts.tick(&clock) => { + dumper.check_stored_parts().await; + } + result = dumper.check_parts_upload() => { + result?; + } + } } - let Some(sync_hash) = chain.get_sync_hash(final_hash)? else { - return Ok(None); - }; - let final_block_header = chain.get_block_header(&final_hash)?; - let epoch_id = *final_block_header.epoch_id(); - let epoch_info = epoch_manager.get_epoch_info(&epoch_id)?; - let epoch_height = epoch_info.epoch_height(); - tracing::debug!(target: "state_sync_dump", ?final_hash, ?sync_hash, ?epoch_id, epoch_height, "get_latest_epoch"); + tracing::debug!(target: "state_sync_dump", "Stopped state dump thread"); + Ok(()) +} - Ok(Some(LatestEpochInfo { epoch_id, epoch_height, sync_hash })) +async fn do_state_sync_dump( + clock: Clock, + chain: Chain, + epoch_manager: Arc, + shard_tracker: ShardTracker, + runtime: Arc, + chain_id: String, + external: ExternalConnection, + iteration_delay: Duration, + validator: MutableValidatorSigner, + keep_running: Arc, + future_spawner: Arc, +) { + if let Err(error) = state_sync_dump( + clock, + chain, + epoch_manager, + shard_tracker, + runtime, + chain_id, + external, + iteration_delay, + validator, + keep_running, + future_spawner, + ) + .await + { + tracing::error!(target: "state_sync_dump", ?error, "State dumper failed"); + } } diff --git a/nearcore/src/test_utils.rs b/nearcore/src/test_utils.rs index 6586172dfb6..baafc533102 100644 --- a/nearcore/src/test_utils.rs +++ b/nearcore/src/test_utils.rs @@ -1,5 +1,5 @@ use near_chain::types::RuntimeAdapter; -use near_chain_configs::Genesis; +use near_chain_configs::{Genesis, DEFAULT_GC_NUM_EPOCHS_TO_KEEP}; use near_client::test_utils::TestEnvBuilder; use near_epoch_manager::EpochManagerHandle; use near_parameters::RuntimeConfigStore; @@ -101,6 +101,7 @@ impl TestEnvNightshadeSetupExt for TestEnvBuilder { Some(runtime_config_store), trie_config, state_snapshot_type.clone(), + DEFAULT_GC_NUM_EPOCHS_TO_KEEP, ) }; let dummy_runtime_configs = diff --git a/tools/indexer/example/Cargo.toml b/tools/indexer/example/Cargo.toml index 2801855ae3f..b4ee9468943 100644 --- a/tools/indexer/example/Cargo.toml +++ b/tools/indexer/example/Cargo.toml @@ -23,3 +23,4 @@ tracing.workspace = true near-config-utils.workspace = true near-indexer.workspace = true near-o11y.workspace = true +near-primitives.workspace = true diff --git a/tools/indexer/example/src/main.rs b/tools/indexer/example/src/main.rs index 007feded2eb..5df39bfcbf4 100644 --- a/tools/indexer/example/src/main.rs +++ b/tools/indexer/example/src/main.rs @@ -275,6 +275,7 @@ fn main() -> Result<()> { home_dir, sync_mode: near_indexer::SyncModeEnum::FromInterruption, await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::WaitForFullSync, + finality: near_primitives::types::Finality::Final, validate_genesis: true, }; let system = actix::System::new(); diff --git a/tools/mirror/src/lib.rs b/tools/mirror/src/lib.rs index 027c60a8ec6..bfde1e38358 100644 --- a/tools/mirror/src/lib.rs +++ b/tools/mirror/src/lib.rs @@ -1763,6 +1763,7 @@ impl TxMirror { home_dir, sync_mode: near_indexer::SyncModeEnum::FromInterruption, await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::StreamWhileSyncing, + finality: Finality::Final, validate_genesis: false, }) .context("failed to start target chain indexer")?;