Skip to content

Commit

Permalink
Merge branch 'master' into stefan/improved_parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
stedfn authored Jan 7, 2025
2 parents 445ff01 + 9d535a8 commit 0043c54
Show file tree
Hide file tree
Showing 31 changed files with 1,526 additions and 626 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ to pay for the storage of their accounts.
`sum(rate(near_peer_message_received_by_type_total{...}[5m]))`.
[#7548](https://github.com/near/nearcore/pull/7548)
* Few changes to `view_state` JSON RPC query:
- The requset has now an optional `include_proof` argument. When set to
- The request has now an optional `include_proof` argument. When set to
`true`, response’s `proof` will be populated.
- The `proof` within each value in `values` list of a `view_state` response is
now deprecated and will be removed in the future. Client code should ignore
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Setting defaul values to enable `not null` by filling values for existing
-- Setting default values to enable `not null` by filling values for existing
-- rows.
alter table ft_transfers
add column initiator text not null default 'crt-benchmarks',
Expand Down
169 changes: 95 additions & 74 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,28 +795,27 @@ impl Chain {
fn get_state_sync_info(
&self,
me: &Option<AccountId>,
epoch_first_block: &Block,
epoch_id: &EpochId,
block_hash: &CryptoHash,
prev_hash: &CryptoHash,
prev_prev_hash: &CryptoHash,
) -> Result<Option<StateSyncInfo>, Error> {
let prev_hash = *epoch_first_block.header().prev_hash();
let shards_to_state_sync = Chain::get_shards_to_state_sync(
self.epoch_manager.as_ref(),
&self.shard_tracker,
me,
&prev_hash,
prev_hash,
prev_prev_hash,
)?;
if shards_to_state_sync.is_empty() {
Ok(None)
} else {
debug!(target: "chain", "Downloading state for {:?}, I'm {:?}", shards_to_state_sync, me);
let epoch_id = epoch_first_block.header().epoch_id();
let protocol_version = self.epoch_manager.get_epoch_protocol_version(epoch_id)?;
// Note that this block is the first block in an epoch because this function is only called
// in get_catchup_and_state_sync_infos() when that is the case.
let state_sync_info = StateSyncInfo::new(
protocol_version,
*epoch_first_block.header().hash(),
shards_to_state_sync,
);
let state_sync_info =
StateSyncInfo::new(protocol_version, *block_hash, shards_to_state_sync);
Ok(Some(state_sync_info))
}
}
Expand Down Expand Up @@ -2024,7 +2023,6 @@ impl Chain {
tracing::debug!(target: "chain", ?shard_id, need_storage_update, "Updating storage");

if need_storage_update {
// TODO(resharding): consider adding to catchup flow.
self.resharding_manager.start_resharding(
self.chain_store.store_update(),
&block,
Expand Down Expand Up @@ -2272,7 +2270,7 @@ impl Chain {
// First real I/O expense.
let prev = self.get_previous_header(header)?;
let prev_hash = *prev.hash();
let prev_prev_hash = *prev.prev_hash();
let prev_prev_hash = prev.prev_hash();
let gas_price = prev.next_gas_price();
let prev_random_value = *prev.random_value();
let prev_height = prev.height();
Expand All @@ -2282,8 +2280,13 @@ impl Chain {
return Err(Error::InvalidBlockHeight(prev_height));
}

let (is_caught_up, state_sync_info) =
self.get_catchup_and_state_sync_infos(header, prev_hash, prev_prev_hash, me, block)?;
let (is_caught_up, state_sync_info) = self.get_catchup_and_state_sync_infos(
header.epoch_id(),
header.hash(),
&prev_hash,
prev_prev_hash,
me,
)?;

self.check_if_challenged_block_on_chain(header)?;

Expand Down Expand Up @@ -2376,29 +2379,32 @@ impl Chain {

fn get_catchup_and_state_sync_infos(
&self,
header: &BlockHeader,
prev_hash: CryptoHash,
prev_prev_hash: CryptoHash,
epoch_id: &EpochId,
block_hash: &CryptoHash,
prev_hash: &CryptoHash,
prev_prev_hash: &CryptoHash,
me: &Option<AccountId>,
block: &MaybeValidated<Block>,
) -> Result<(bool, Option<StateSyncInfo>), Error> {
if self.epoch_manager.is_next_block_epoch_start(&prev_hash)? {
debug!(target: "chain", block_hash=?header.hash(), "block is the first block of an epoch");
if !self.prev_block_is_caught_up(&prev_prev_hash, &prev_hash)? {
// The previous block is not caught up for the next epoch relative to the previous
// block, which is the current epoch for this block, so this block cannot be applied
// at all yet, needs to be orphaned
return Err(Error::Orphan);
}

// For the first block of the epoch we check if we need to start download states for
// shards that we will care about in the next epoch. If there is no state to be downloaded,
// we consider that we are caught up, otherwise not
let state_sync_info = self.get_state_sync_info(me, block)?;
Ok((state_sync_info.is_none(), state_sync_info))
} else {
Ok((self.prev_block_is_caught_up(&prev_prev_hash, &prev_hash)?, None))
if !self.epoch_manager.is_next_block_epoch_start(prev_hash)? {
return Ok((self.prev_block_is_caught_up(prev_prev_hash, prev_hash)?, None));
}
if !self.prev_block_is_caught_up(prev_prev_hash, prev_hash)? {
// The previous block is not caught up for the next epoch relative to the previous
// block, which is the current epoch for this block, so this block cannot be applied
// at all yet, needs to be orphaned
return Err(Error::Orphan);
}

// For the first block of the epoch we check if we need to start download states for
// shards that we will care about in the next epoch. If there is no state to be downloaded,
// we consider that we are caught up, otherwise not
let state_sync_info =
self.get_state_sync_info(me, epoch_id, block_hash, prev_hash, prev_prev_hash)?;
debug!(
target: "chain", %block_hash, shards_to_sync=?state_sync_info.as_ref().map(|s| s.shards()),
"Checked for shards to sync for epoch T+1 upon processing first block of epoch T"
);
Ok((state_sync_info.is_none(), state_sync_info))
}

pub fn prev_block_is_caught_up(
Expand Down Expand Up @@ -2426,56 +2432,71 @@ impl Chain {
shard_tracker: &ShardTracker,
me: &Option<AccountId>,
parent_hash: &CryptoHash,
prev_prev_hash: &CryptoHash,
) -> Result<Vec<ShardId>, Error> {
let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash)?;
Ok((epoch_manager.shard_ids(&epoch_id)?)
.into_iter()
.filter(|shard_id| {
Self::should_catch_up_shard(
epoch_manager,
shard_tracker,
me,
parent_hash,
*shard_id,
)
})
.collect())
let mut shards_to_sync = Vec::new();
for shard_id in epoch_manager.shard_ids(&epoch_id)? {
if Self::should_catch_up_shard(
epoch_manager,
shard_tracker,
me,
&epoch_id,
parent_hash,
prev_prev_hash,
shard_id,
)? {
shards_to_sync.push(shard_id)
}
}
Ok(shards_to_sync)
}

/// Returns whether we need to initiate state sync for the given `shard_id` for the epoch
/// beginning after the block `epoch_last_block`. If that epoch is epoch T, the logic is:
/// - will track the shard in epoch T+1
/// - AND not tracking it in T
/// - AND didn't track it in T-1
/// We check that we didn't track it in T-1 because if so, and we're in the relatively rare case
/// where we'll go from tracking it to not tracking it and back to tracking it in consecutive epochs,
/// then we can just continue to apply chunks as if we were tracking it in epoch T, and there's no need to state sync.
fn should_catch_up_shard(
epoch_manager: &dyn EpochManagerAdapter,
shard_tracker: &ShardTracker,
me: &Option<AccountId>,
parent_hash: &CryptoHash,
epoch_id: &EpochId,
epoch_last_block: &CryptoHash,
epoch_last_block_prev: &CryptoHash,
shard_id: ShardId,
) -> bool {
let result = epoch_manager.will_shard_layout_change(parent_hash);
let will_shard_layout_change = match result {
Ok(_will_shard_layout_change) => {
// TODO(#11881): before state sync is fixed, we don't catch up
// split shards. Assume that all needed shards are tracked
// already.
// will_shard_layout_change,
false
}
Err(err) => {
// TODO(resharding) This is a problem, if this happens the node
// will not perform resharding and fall behind the network.
tracing::error!(target: "chain", ?err, "failed to check if shard layout will change");
false
}
};
// if shard layout will change the next epoch, we should catch up the shard regardless
// whether we already have the shard's state this epoch, because we need to generate
// new states for shards split from the current shard for the next epoch
let will_care_about_shard =
shard_tracker.will_care_about_shard(me.as_ref(), parent_hash, shard_id, true);
let does_care_about_shard =
shard_tracker.care_about_shard(me.as_ref(), parent_hash, shard_id, true);
) -> Result<bool, Error> {
// Won't care about it next epoch, no need to state sync it.
if !shard_tracker.will_care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) {
return Ok(false);
}
// Currently tracking the shard, so no need to state sync it.
if shard_tracker.care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) {
return Ok(false);
}

tracing::debug!(target: "chain", does_care_about_shard, will_care_about_shard, will_shard_layout_change, "should catch up shard");
// Now we need to state sync it unless we were tracking the parent in the previous epoch,
// in which case we don't need to because we already have the state, and can just continue applying chunks
if epoch_id == &EpochId::default() {
return Ok(true);
}

will_care_about_shard && (will_shard_layout_change || !does_care_about_shard)
let (_layout, parent_shard_id, _index) =
epoch_manager.get_prev_shard_id_from_prev_hash(epoch_last_block, shard_id)?;
// Note that here passing `epoch_last_block_prev` to care_about_shard() will have us check whether we were tracking it in
// the previous epoch, because it is the "parent_hash" of the last block of the previous epoch.
// TODO: consider refactoring these ShardTracker functions to accept an epoch_id
// to make this less tricky.
let tracked_before = shard_tracker.care_about_shard(
me.as_ref(),
epoch_last_block_prev,
parent_shard_id,
true,
);
Ok(!tracked_before)
}

/// Check if any block with missing chunk is ready to be processed and start processing these blocks
Expand Down Expand Up @@ -3127,8 +3148,8 @@ impl Chain {
}
blocks_catch_up_state.done_blocks.push(queued_block);
}
Err(_) => {
error!("Error processing block during catch up, retrying");
Err(err) => {
error!(target: "chain", ?err, "Error processing block during catch up, retrying");
blocks_catch_up_state.pending_blocks.push(queued_block);
}
}
Expand Down
74 changes: 70 additions & 4 deletions chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex};
use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle};
use near_chain_primitives::Error;

use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

use crate::resharding::event_type::{ReshardingEventType, ReshardingSplitShardParams};
use crate::resharding::types::{
Expand Down Expand Up @@ -1025,9 +1025,16 @@ fn copy_kv_to_child(

// Sanity check we are truly writing to one of the expected children shards.
if new_shard_uid != *left_child_shard && new_shard_uid != *right_child_shard {
let err_msg = "account id doesn't map to any child shard!";
error!(target: "resharding", ?new_shard_uid, ?left_child_shard, ?right_child_shard, ?shard_layout, ?account_id, err_msg);
return Err(Error::ReshardingError(err_msg.to_string()));
let err_msg = "account id doesn't map to any child shard! - skipping it";
warn!(target: "resharding", ?new_shard_uid, ?left_child_shard, ?right_child_shard, ?shard_layout, ?account_id, err_msg);

// TODO(resharding): add a debug assertion once the root cause is fixed. The current
// hypothesis is that flat storage might contain keys with account_id outside of the shard's
// boundary due to either a bug in the state generation for forknet or corrupted state in
// mainnet.

// Do not fail resharding. Just skip this entry.
return Ok(());
}
// Add the new flat store entry.
store_update.set(new_shard_uid, key, value);
Expand Down Expand Up @@ -2561,4 +2568,63 @@ mod tests {
Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }))
);
}

/// This test asserts that resharding doesn't fail if flat storage iteration goes over an account
/// which is not part of any children shards after the shard layout changes.
#[test]
fn unrelated_account_do_not_fail_splitting() {
init_test_logger();
let (mut chain, resharder, sender) =
create_chain_resharder_sender::<DelayedSender>(simple_shard_layout());
let new_shard_layout = shard_layout_after_split();
let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout);
let ReshardingSplitShardParams {
parent_shard, left_child_shard, right_child_shard, ..
} = match resharding_event_type.clone() {
ReshardingEventType::SplitShard(params) => params,
};
let flat_store = resharder.runtime.store().flat_store();

// Add two blocks on top of genesis. This will make the resharding block (height 0) final.
add_blocks_to_chain(
&mut chain,
2,
PreviousBlockHeight::ChainHead,
NextBlockHeight::ChainHeadPlusOne,
);

// Inject an account which doesn't belong to the parent shard into its flat storage.
let mut store_update = flat_store.store_update();
let test_value = Some(FlatStateValue::Inlined(vec![0]));
let key = TrieKey::Account { account_id: account!("ab") };
store_update.set(parent_shard, key.to_vec(), test_value);
store_update.commit().unwrap();

// Perform resharding.
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok());
sender.call_split_shard_task();

// Check final status of parent flat storage.
let parent = ShardUId { version: 3, shard_id: 1 };
assert_eq!(flat_store.get_flat_storage_status(parent), Ok(FlatStorageStatus::Empty));
assert_eq!(flat_store.iter(parent).count(), 0);
assert!(resharder
.runtime
.get_flat_storage_manager()
.get_flat_storage_for_shard(parent)
.is_none());

// Check intermediate status of children flat storages.
// If children reached the catching up state, it means that the split task succeeded.
for child in [left_child_shard, right_child_shard] {
assert_eq!(
flat_store.get_flat_storage_status(child),
Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
chain.final_head().unwrap().into()
)))
);
// However, the unrelated account should not end up in any child.
assert!(flat_store.get(child, &key.to_vec()).is_ok_and(|val| val.is_none()));
}
}
}
Loading

0 comments on commit 0043c54

Please sign in to comment.