Skip to content

Commit

Permalink
Merge branch 'master' into typos
Browse files Browse the repository at this point in the history
  • Loading branch information
wacban authored Jan 7, 2025
2 parents b8155e1 + b2fb586 commit 0575e57
Show file tree
Hide file tree
Showing 17 changed files with 1,180 additions and 583 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

164 changes: 93 additions & 71 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,28 +795,27 @@ impl Chain {
fn get_state_sync_info(
&self,
me: &Option<AccountId>,
epoch_first_block: &Block,
epoch_id: &EpochId,
block_hash: &CryptoHash,
prev_hash: &CryptoHash,
prev_prev_hash: &CryptoHash,
) -> Result<Option<StateSyncInfo>, Error> {
let prev_hash = *epoch_first_block.header().prev_hash();
let shards_to_state_sync = Chain::get_shards_to_state_sync(
self.epoch_manager.as_ref(),
&self.shard_tracker,
me,
&prev_hash,
prev_hash,
prev_prev_hash,
)?;
if shards_to_state_sync.is_empty() {
Ok(None)
} else {
debug!(target: "chain", "Downloading state for {:?}, I'm {:?}", shards_to_state_sync, me);
let epoch_id = epoch_first_block.header().epoch_id();
let protocol_version = self.epoch_manager.get_epoch_protocol_version(epoch_id)?;
// Note that this block is the first block in an epoch because this function is only called
// in get_catchup_and_state_sync_infos() when that is the case.
let state_sync_info = StateSyncInfo::new(
protocol_version,
*epoch_first_block.header().hash(),
shards_to_state_sync,
);
let state_sync_info =
StateSyncInfo::new(protocol_version, *block_hash, shards_to_state_sync);
Ok(Some(state_sync_info))
}
}
Expand Down Expand Up @@ -2271,7 +2270,7 @@ impl Chain {
// First real I/O expense.
let prev = self.get_previous_header(header)?;
let prev_hash = *prev.hash();
let prev_prev_hash = *prev.prev_hash();
let prev_prev_hash = prev.prev_hash();
let gas_price = prev.next_gas_price();
let prev_random_value = *prev.random_value();
let prev_height = prev.height();
Expand All @@ -2281,8 +2280,13 @@ impl Chain {
return Err(Error::InvalidBlockHeight(prev_height));
}

let (is_caught_up, state_sync_info) =
self.get_catchup_and_state_sync_infos(header, prev_hash, prev_prev_hash, me, block)?;
let (is_caught_up, state_sync_info) = self.get_catchup_and_state_sync_infos(
header.epoch_id(),
header.hash(),
&prev_hash,
prev_prev_hash,
me,
)?;

self.check_if_challenged_block_on_chain(header)?;

Expand Down Expand Up @@ -2375,29 +2379,32 @@ impl Chain {

fn get_catchup_and_state_sync_infos(
&self,
header: &BlockHeader,
prev_hash: CryptoHash,
prev_prev_hash: CryptoHash,
epoch_id: &EpochId,
block_hash: &CryptoHash,
prev_hash: &CryptoHash,
prev_prev_hash: &CryptoHash,
me: &Option<AccountId>,
block: &MaybeValidated<Block>,
) -> Result<(bool, Option<StateSyncInfo>), Error> {
if self.epoch_manager.is_next_block_epoch_start(&prev_hash)? {
debug!(target: "chain", block_hash=?header.hash(), "block is the first block of an epoch");
if !self.prev_block_is_caught_up(&prev_prev_hash, &prev_hash)? {
// The previous block is not caught up for the next epoch relative to the previous
// block, which is the current epoch for this block, so this block cannot be applied
// at all yet, needs to be orphaned
return Err(Error::Orphan);
}

// For the first block of the epoch we check if we need to start download states for
// shards that we will care about in the next epoch. If there is no state to be downloaded,
// we consider that we are caught up, otherwise not
let state_sync_info = self.get_state_sync_info(me, block)?;
Ok((state_sync_info.is_none(), state_sync_info))
} else {
Ok((self.prev_block_is_caught_up(&prev_prev_hash, &prev_hash)?, None))
if !self.epoch_manager.is_next_block_epoch_start(prev_hash)? {
return Ok((self.prev_block_is_caught_up(prev_prev_hash, prev_hash)?, None));
}
if !self.prev_block_is_caught_up(prev_prev_hash, prev_hash)? {
// The previous block is not caught up for the next epoch relative to the previous
// block, which is the current epoch for this block, so this block cannot be applied
// at all yet, needs to be orphaned
return Err(Error::Orphan);
}

// For the first block of the epoch we check if we need to start download states for
// shards that we will care about in the next epoch. If there is no state to be downloaded,
// we consider that we are caught up, otherwise not
let state_sync_info =
self.get_state_sync_info(me, epoch_id, block_hash, prev_hash, prev_prev_hash)?;
debug!(
target: "chain", %block_hash, shards_to_sync=?state_sync_info.as_ref().map(|s| s.shards()),
"Checked for shards to sync for epoch T+1 upon processing first block of epoch T"
);
Ok((state_sync_info.is_none(), state_sync_info))
}

pub fn prev_block_is_caught_up(
Expand Down Expand Up @@ -2425,56 +2432,71 @@ impl Chain {
shard_tracker: &ShardTracker,
me: &Option<AccountId>,
parent_hash: &CryptoHash,
prev_prev_hash: &CryptoHash,
) -> Result<Vec<ShardId>, Error> {
let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash)?;
Ok((epoch_manager.shard_ids(&epoch_id)?)
.into_iter()
.filter(|shard_id| {
Self::should_catch_up_shard(
epoch_manager,
shard_tracker,
me,
parent_hash,
*shard_id,
)
})
.collect())
let mut shards_to_sync = Vec::new();
for shard_id in epoch_manager.shard_ids(&epoch_id)? {
if Self::should_catch_up_shard(
epoch_manager,
shard_tracker,
me,
&epoch_id,
parent_hash,
prev_prev_hash,
shard_id,
)? {
shards_to_sync.push(shard_id)
}
}
Ok(shards_to_sync)
}

/// Returns whether we need to initiate state sync for the given `shard_id` for the epoch
/// beginning after the block `epoch_last_block`. If that epoch is epoch T, the logic is:
/// - will track the shard in epoch T+1
/// - AND not tracking it in T
/// - AND didn't track it in T-1
/// We check that we didn't track it in T-1 because if so, and we're in the relatively rare case
/// where we'll go from tracking it to not tracking it and back to tracking it in consecutive epochs,
/// then we can just continue to apply chunks as if we were tracking it in epoch T, and there's no need to state sync.
fn should_catch_up_shard(
epoch_manager: &dyn EpochManagerAdapter,
shard_tracker: &ShardTracker,
me: &Option<AccountId>,
parent_hash: &CryptoHash,
epoch_id: &EpochId,
epoch_last_block: &CryptoHash,
epoch_last_block_prev: &CryptoHash,
shard_id: ShardId,
) -> bool {
let result = epoch_manager.will_shard_layout_change(parent_hash);
let will_shard_layout_change = match result {
Ok(_will_shard_layout_change) => {
// TODO(#11881): before state sync is fixed, we don't catch up
// split shards. Assume that all needed shards are tracked
// already.
// will_shard_layout_change,
false
}
Err(err) => {
// TODO(resharding) This is a problem, if this happens the node
// will not perform resharding and fall behind the network.
tracing::error!(target: "chain", ?err, "failed to check if shard layout will change");
false
}
};
// if shard layout will change the next epoch, we should catch up the shard regardless
// whether we already have the shard's state this epoch, because we need to generate
// new states for shards split from the current shard for the next epoch
let will_care_about_shard =
shard_tracker.will_care_about_shard(me.as_ref(), parent_hash, shard_id, true);
let does_care_about_shard =
shard_tracker.care_about_shard(me.as_ref(), parent_hash, shard_id, true);
) -> Result<bool, Error> {
// Won't care about it next epoch, no need to state sync it.
if !shard_tracker.will_care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) {
return Ok(false);
}
// Currently tracking the shard, so no need to state sync it.
if shard_tracker.care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) {
return Ok(false);
}

tracing::debug!(target: "chain", does_care_about_shard, will_care_about_shard, will_shard_layout_change, "should catch up shard");
// Now we need to state sync it unless we were tracking the parent in the previous epoch,
// in which case we don't need to because we already have the state, and can just continue applying chunks
if epoch_id == &EpochId::default() {
return Ok(true);
}

will_care_about_shard && (will_shard_layout_change || !does_care_about_shard)
let (_layout, parent_shard_id, _index) =
epoch_manager.get_prev_shard_id_from_prev_hash(epoch_last_block, shard_id)?;
// Note that here passing `epoch_last_block_prev` to care_about_shard() will have us check whether we were tracking it in
// the previous epoch, because it is the "parent_hash" of the last block of the previous epoch.
// TODO: consider refactoring these ShardTracker functions to accept an epoch_id
// to make this less tricky.
let tracked_before = shard_tracker.care_about_shard(
me.as_ref(),
epoch_last_block_prev,
parent_shard_id,
true,
);
Ok(!tracked_before)
}

/// Check if any block with missing chunk is ready to be processed and start processing these blocks
Expand Down
4 changes: 2 additions & 2 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ impl RuntimeAdapter for NightshadeRuntime {
// flat storage by not charging gas for trie nodes.
// WARNING: should never be used in production! Consider this option only for debugging or replaying blocks.
let mut trie = self.tries.get_trie_for_shard(shard_uid, storage_config.state_root);
trie.dont_charge_gas_for_trie_node_access();
trie.set_charge_gas_for_trie_node_access(false);
trie
}
StorageDataSource::Recorded(storage) => Trie::from_recorded_storage(
Expand Down Expand Up @@ -862,7 +862,7 @@ impl RuntimeAdapter for NightshadeRuntime {
storage_config.state_root,
false,
)?;
trie.dont_charge_gas_for_trie_node_access();
trie.set_charge_gas_for_trie_node_access(false);
trie
}
StorageDataSource::Recorded(storage) => Trie::from_recorded_storage(
Expand Down
39 changes: 29 additions & 10 deletions chain/chain/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,16 +1025,35 @@ impl ChainStore {
key
}

/// Retrieves STATE_SYNC_DUMP for the given shard.
pub fn get_state_sync_dump_progress(
&self,
shard_id: ShardId,
) -> Result<StateSyncDumpProgress, Error> {
option_to_not_found(
self.store
.get_ser(DBCol::BlockMisc, &ChainStore::state_sync_dump_progress_key(shard_id)),
format!("STATE_SYNC_DUMP:{}", shard_id),
)
/// For each value stored, this returs an (EpochId, bool), where the bool tells whether it's finished
/// because those are the only fields we really care about.
pub fn iter_state_sync_dump_progress<'a>(
&'a self,
) -> impl Iterator<Item = io::Result<(ShardId, (EpochId, bool))>> + 'a {
self.store
.iter_prefix_ser::<StateSyncDumpProgress>(DBCol::BlockMisc, STATE_SYNC_DUMP_KEY)
.map(|item| {
item.and_then(|(key, progress)| {
// + 1 for the ':'
let prefix_len = STATE_SYNC_DUMP_KEY.len() + 1;
let int_part = &key[prefix_len..];
let int_part = int_part.try_into().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Bad StateSyncDump columnn key length: {}", key.len()),
)
})?;
let shard_id = ShardId::from_le_bytes(int_part);
Ok((
shard_id,
match progress {
StateSyncDumpProgress::AllDumped { epoch_id, .. } => (epoch_id, true),
StateSyncDumpProgress::InProgress { epoch_id, .. } => (epoch_id, false),
StateSyncDumpProgress::Skipped { epoch_id, .. } => (epoch_id, true),
},
))
})
})
}

/// Updates STATE_SYNC_DUMP for the given shard.
Expand Down
20 changes: 1 addition & 19 deletions chain/client/src/sync/state/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::task_tracker::TaskTracker;
use crate::metrics;
use crate::sync::state::chain_requests::ChainFinalizationRequest;
use futures::{StreamExt, TryStreamExt};
use near_async::futures::{FutureSpawner, FutureSpawnerExt};
use near_async::futures::{respawn_for_parallelism, FutureSpawner};
use near_async::messaging::AsyncSender;
use near_chain::types::RuntimeAdapter;
use near_chain::BlockHeader;
Expand Down Expand Up @@ -280,21 +280,3 @@ async fn apply_state_part(
)?;
Ok(())
}

/// Given a future, respawn it as an equivalent future but which does not block the
/// driver of the future. For example, if the given future directly performs
/// computation, normally the whoever drives the future (such as a buffered_unordered)
/// would be blocked by the computation, thereby not allowing computation of other
/// futures driven by the same driver to proceed. This function respawns the future
/// onto the FutureSpawner, so the driver of the returned future would not be blocked.
fn respawn_for_parallelism<T: Send + 'static>(
future_spawner: &dyn FutureSpawner,
name: &'static str,
f: impl std::future::Future<Output = T> + Send + 'static,
) -> impl std::future::Future<Output = T> + Send + 'static {
let (sender, receiver) = tokio::sync::oneshot::channel();
future_spawner.spawn(name, async move {
sender.send(f.await).ok();
});
async move { receiver.await.unwrap() }
}
18 changes: 18 additions & 0 deletions core/async/src/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,24 @@ impl FutureSpawnerExt for dyn FutureSpawner + '_ {
}
}

/// Given a future, respawn it as an equivalent future but which does not block the
/// driver of the future. For example, if the given future directly performs
/// computation, normally the whoever drives the future (such as a buffered_unordered)
/// would be blocked by the computation, thereby not allowing computation of other
/// futures driven by the same driver to proceed. This function respawns the future
/// onto the FutureSpawner, so the driver of the returned future would not be blocked.
pub fn respawn_for_parallelism<T: Send + 'static>(
future_spawner: &dyn FutureSpawner,
name: &'static str,
f: impl std::future::Future<Output = T> + Send + 'static,
) -> impl std::future::Future<Output = T> + Send + 'static {
let (sender, receiver) = tokio::sync::oneshot::channel();
future_spawner.spawn(name, async move {
sender.send(f.await).ok();
});
async move { receiver.await.unwrap() }
}

/// A FutureSpawner that hands over the future to Actix.
pub struct ActixFutureSpawner;

Expand Down
11 changes: 8 additions & 3 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,11 +694,16 @@ impl Trie {
)))),
None => RefCell::new(TrieAccountingCache::new(None)),
};
// Technically the charge_gas_for_trie_node_access should be set based
// on the flat storage protocol feature. When flat storage is enabled
// the trie node access should be free and the charge flag should be set
// to false.
let charge_gas_for_trie_node_access = false;
Trie {
storage,
memtries,
root,
charge_gas_for_trie_node_access: flat_storage_chunk_view.is_none(),
charge_gas_for_trie_node_access,
flat_storage_chunk_view,
accounting_cache,
recorder: None,
Expand All @@ -711,8 +716,8 @@ impl Trie {
}

/// Helper to simulate gas costs as if flat storage was present.
pub fn dont_charge_gas_for_trie_node_access(&mut self) {
self.charge_gas_for_trie_node_access = false;
pub fn set_charge_gas_for_trie_node_access(&mut self, value: bool) {
self.charge_gas_for_trie_node_access = value;
}

/// Makes a new trie that has everything the same except that access
Expand Down
4 changes: 3 additions & 1 deletion core/store/src/trie/trie_recording.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,9 @@ mod trie_recording_tests {
false,
)
} else {
tries.get_trie_for_shard(shard_uid, state_root)
let mut trie = tries.get_trie_for_shard(shard_uid, state_root);
trie.charge_gas_for_trie_node_access = true;
trie
}
}

Expand Down
Loading

0 comments on commit 0575e57

Please sign in to comment.