Skip to content

Commit

Permalink
Merge branch 'master' into rv3-storage-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wacban committed Jan 3, 2025
2 parents 1a9dcd6 + 21b5109 commit 853ddaa
Show file tree
Hide file tree
Showing 57 changed files with 1,262 additions and 748 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

## Reference implementation of NEAR Protocol

![Buildkite](https://img.shields.io/buildkite/0eae07525f8e44a19b48fa937813e2c21ee04aa351361cd851)
[![Buildkite](https://img.shields.io/buildkite/0eae07525f8e44a19b48fa937813e2c21ee04aa351361cd851)][buildkite]
![Stable Status][stable-release]
![Prerelease Status][prerelease]
[![codecov][codecov-badge]][codecov-url]
[![Discord chat][discord-badge]][discord-url]
[![Telegram Group][telegram-badge]][telegram-url]

[buildkite]: https://github.com/near/nearcore/actions
[stable-release]: https://img.shields.io/github/v/release/nearprotocol/nearcore?label=stable
[prerelease]: https://img.shields.io/github/v/release/nearprotocol/nearcore?include_prereleases&label=prerelease
[ci-badge-master]: https://badge.buildkite.com/a81147cb62c585cc434459eedd1d25e521453120ead9ee6c64.svg?branch=master
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3915,7 +3915,7 @@ impl Chain {
}

/// Function to check whether we need to create a new snapshot while processing the current block
/// Note that this functions is called as a part of block preprocesing, so the head is not updated to current block
/// Note that this functions is called as a part of block preprocessing, so the head is not updated to current block
fn should_make_or_delete_snapshot(&mut self) -> Result<SnapshotAction, Error> {
// head value is that of the previous block, i.e. curr_block.prev_hash
let head = self.head()?;
Expand Down
147 changes: 107 additions & 40 deletions chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex};
use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle};
use near_chain_primitives::Error;

use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

use crate::resharding::event_type::{ReshardingEventType, ReshardingSplitShardParams};
use crate::resharding::types::{
Expand Down Expand Up @@ -175,7 +175,7 @@ impl FlatStorageResharder {
parent_shard,
left_child_shard,
right_child_shard,
resharding_hash,
resharding_block,
..
} = split_params;
info!(target: "resharding", ?split_params, "initiating flat storage shard split");
Expand All @@ -192,7 +192,7 @@ impl FlatStorageResharder {
left_child_shard,
right_child_shard,
shard_layout: shard_layout.clone(),
resharding_hash,
resharding_block,
flat_head,
};
store_update.set_flat_storage_status(
Expand Down Expand Up @@ -390,11 +390,11 @@ impl FlatStorageResharder {
let mut iter = match self.flat_storage_iterator(
&flat_store,
&parent_shard,
&split_params.resharding_hash,
&split_params.resharding_block.hash,
) {
Ok(iter) => iter,
Err(err) => {
error!(target: "resharding", ?parent_shard, block_hash=?split_params.resharding_hash, ?err, "failed to build flat storage iterator");
error!(target: "resharding", ?parent_shard, block_hash=?split_params.resharding_block.hash, ?err, "failed to build flat storage iterator");
return FlatStorageReshardingTaskResult::Failed;
}
};
Expand Down Expand Up @@ -482,7 +482,7 @@ impl FlatStorageResharder {
left_child_shard,
right_child_shard,
flat_head,
resharding_hash,
resharding_block,
..
} = split_params;
let flat_store = self.runtime.store().flat_store();
Expand All @@ -507,7 +507,7 @@ impl FlatStorageResharder {
store_update.set_flat_storage_status(
child_shard,
FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
resharding_hash,
resharding_block,
)),
);
// Catchup will happen in a separate task, so send a request to schedule the
Expand Down Expand Up @@ -670,9 +670,8 @@ impl FlatStorageResharder {
.flat_store()
.get_flat_storage_status(shard_uid)
.map_err(|e| Into::<StorageError>::into(e))?;
let FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
mut flat_head_block_hash,
)) = status
let FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(mut flat_head)) =
status
else {
return Err(Error::Other(format!(
"unexpected resharding catchup flat storage status for {}: {:?}",
Expand All @@ -686,16 +685,16 @@ impl FlatStorageResharder {
target: "resharding",
"shard_catchup_apply_deltas/batch",
?shard_uid,
?flat_head_block_hash,
?flat_head,
batch_id = ?num_batches_done)
.entered();
let chain_final_head = chain_store.final_head()?;

// If we reached the desired new flat head, we can terminate the delta application step.
if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) {
if is_flat_head_on_par_with_chain(&flat_head.hash, &chain_final_head) {
return Ok(Some((
num_batches_done,
Tip::from_header(&chain_store.get_block_header(&flat_head_block_hash)?),
Tip::from_header(&chain_store.get_block_header(&flat_head.hash)?),
)));
}

Expand All @@ -706,26 +705,32 @@ impl FlatStorageResharder {

// Merge deltas from the next blocks until we reach chain final head.
for _ in 0..catch_up_blocks {
let height = chain_store.get_block_height(&flat_head_block_hash)?;
debug_assert!(
height <= chain_final_head.height,
"flat head: {flat_head_block_hash}"
flat_head.height <= chain_final_head.height,
"flat head: {:?}",
&flat_head,
);
// Stop if we reached the desired new flat head.
if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) {
if is_flat_head_on_par_with_chain(&flat_head.hash, &chain_final_head) {
break;
}
if self.coordinate_snapshot(height) {
if self.coordinate_snapshot(flat_head.height) {
postpone = true;
break;
}
flat_head_block_hash = chain_store.get_next_block_hash(&flat_head_block_hash)?;
let next_hash = chain_store.get_next_block_hash(&flat_head.hash)?;
let next_header = chain_store.get_block_header(&next_hash)?;
flat_head = BlockInfo {
hash: *next_header.hash(),
height: next_header.height(),
prev_hash: *next_header.prev_hash(),
};
if let Some(changes) = store
.get_delta(shard_uid, flat_head_block_hash)
.get_delta(shard_uid, flat_head.hash)
.map_err(|err| Into::<StorageError>::into(err))?
{
merged_changes.merge(changes);
store_update.remove_delta(shard_uid, flat_head_block_hash);
store_update.remove_delta(shard_uid, flat_head.hash);
}
// TODO(resharding): if flat_head_block_hash == state sync hash -> do snapshot
}
Expand All @@ -734,14 +739,12 @@ impl FlatStorageResharder {
merged_changes.apply_to_flat_state(&mut store_update, shard_uid);
store_update.set_flat_storage_status(
shard_uid,
FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
flat_head_block_hash,
)),
FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(flat_head)),
);
store_update.commit()?;

num_batches_done += 1;
metrics.set_head_height(chain_store.get_block_height(&flat_head_block_hash)?);
metrics.set_head_height(flat_head.height);

if postpone {
return Ok(None);
Expand Down Expand Up @@ -1022,9 +1025,10 @@ fn copy_kv_to_child(

// Sanity check we are truly writing to one of the expected children shards.
if new_shard_uid != *left_child_shard && new_shard_uid != *right_child_shard {
let err_msg = "account id doesn't map to any child shard!";
error!(target: "resharding", ?new_shard_uid, ?left_child_shard, ?right_child_shard, ?shard_layout, ?account_id, err_msg);
return Err(Error::ReshardingError(err_msg.to_string()));
let err_msg = "account id doesn't map to any child shard! - skipping it";
warn!(target: "resharding", ?new_shard_uid, ?left_child_shard, ?right_child_shard, ?shard_layout, ?account_id, err_msg);
// Do not fail resharding. Just skip this entry.
return Ok(());
}
// Add the new flat store entry.
store_update.set(new_shard_uid, key, value);
Expand Down Expand Up @@ -1099,7 +1103,7 @@ impl FlatStorageReshardingEventStatus {
fn resharding_hash(&self) -> CryptoHash {
match self {
FlatStorageReshardingEventStatus::SplitShard(_, split_status, ..) => {
split_status.resharding_hash
split_status.resharding_block.hash
}
}
}
Expand Down Expand Up @@ -1384,12 +1388,9 @@ mod tests {
chain: &Chain,
new_shard_layout: &ShardLayout,
) -> ReshardingEventType {
ReshardingEventType::from_shard_layout(
&new_shard_layout,
chain.head().unwrap().last_block_hash,
)
.unwrap()
.unwrap()
ReshardingEventType::from_shard_layout(&new_shard_layout, chain.head().unwrap().into())
.unwrap()
.unwrap()
}

enum PreviousBlockHeight {
Expand Down Expand Up @@ -1524,7 +1525,11 @@ mod tests {
left_child_shard,
right_child_shard,
shard_layout: new_shard_layout,
resharding_hash: CryptoHash::default(),
resharding_block: BlockInfo {
hash: CryptoHash::default(),
height: 2,
prev_hash: CryptoHash::default(),
},
flat_head: BlockInfo {
hash: CryptoHash::default(),
height: 1,
Expand Down Expand Up @@ -1597,7 +1602,7 @@ mod tests {
assert_eq!(
flat_store.get_flat_storage_status(child),
Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
chain.final_head().unwrap().last_block_hash
chain.final_head().unwrap().into()
)))
);
}
Expand Down Expand Up @@ -2219,7 +2224,7 @@ mod tests {
parent_shard,
left_child_shard,
right_child_shard,
resharding_hash,
resharding_block,
..
} = match resharding_event_type.clone() {
ReshardingEventType::SplitShard(params) => params,
Expand Down Expand Up @@ -2278,12 +2283,15 @@ mod tests {
resharder.resharding_config,
);
assert!(resharder
.resume(left_child_shard, &FlatStorageReshardingStatus::CatchingUp(resharding_hash))
.resume(
left_child_shard,
&FlatStorageReshardingStatus::CatchingUp(resharding_block)
)
.is_ok());
assert!(resharder
.resume(
right_child_shard,
&FlatStorageReshardingStatus::CatchingUp(resharding_hash)
&FlatStorageReshardingStatus::CatchingUp(resharding_block)
)
.is_ok());
}
Expand Down Expand Up @@ -2554,4 +2562,63 @@ mod tests {
Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }))
);
}

/// This test asserts that resharding doesn't fail if flat storage iteration goes over an account
/// which is not part of any children shards after the shard layout changes.
#[test]
fn unrelated_account_do_not_fail_splitting() {
init_test_logger();
let (mut chain, resharder, sender) =
create_chain_resharder_sender::<DelayedSender>(simple_shard_layout());
let new_shard_layout = shard_layout_after_split();
let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout);
let ReshardingSplitShardParams {
parent_shard, left_child_shard, right_child_shard, ..
} = match resharding_event_type.clone() {
ReshardingEventType::SplitShard(params) => params,
};
let flat_store = resharder.runtime.store().flat_store();

// Add two blocks on top of genesis. This will make the resharding block (height 0) final.
add_blocks_to_chain(
&mut chain,
2,
PreviousBlockHeight::ChainHead,
NextBlockHeight::ChainHeadPlusOne,
);

// Inject an account which doesn't belong to the parent shard into its flat storage.
let mut store_update = flat_store.store_update();
let test_value = Some(FlatStateValue::Inlined(vec![0]));
let key = TrieKey::Account { account_id: account!("ab") };
store_update.set(parent_shard, key.to_vec(), test_value);
store_update.commit().unwrap();

// Perform resharding.
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok());
sender.call_split_shard_task();

// Check final status of parent flat storage.
let parent = ShardUId { version: 3, shard_id: 1 };
assert_eq!(flat_store.get_flat_storage_status(parent), Ok(FlatStorageStatus::Empty));
assert_eq!(flat_store.iter(parent).count(), 0);
assert!(resharder
.runtime
.get_flat_storage_manager()
.get_flat_storage_for_shard(parent)
.is_none());

// Check intermediate status of children flat storages.
// If children reached the catching up state, it means that the split task succeeded.
for child in [left_child_shard, right_child_shard] {
assert_eq!(
flat_store.get_flat_storage_status(child),
Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
chain.final_head().unwrap().into()
)))
);
// However, the unrelated account should not end up in any child.
assert!(flat_store.get(child, &key.to_vec()).is_ok_and(|val| val.is_none()));
}
}
}
2 changes: 1 addition & 1 deletion chain/chain/src/orphan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Orphan {
/// 2) size of the pool exceeds MAX_ORPHAN_SIZE and the orphan was added a long time ago
/// or the height is high
pub struct OrphanBlockPool {
/// A map from block hash to a orphan block
/// A map from block hash to an orphan block
orphans: HashMap<CryptoHash, Orphan>,
/// A set that contains all orphans for which we have requested missing chunks for them
/// An orphan can be added to this set when it was first added to the pool, or later
Expand Down
Loading

0 comments on commit 853ddaa

Please sign in to comment.