Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into resharding-sync-shards
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelo-gonzalez committed Jan 6, 2025
2 parents 7f5d2f9 + 445ee6f commit 6791325
Show file tree
Hide file tree
Showing 19 changed files with 277 additions and 42 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ to pay for the storage of their accounts.
`sum(rate(near_peer_message_received_by_type_total{...}[5m]))`.
[#7548](https://github.com/near/nearcore/pull/7548)
* Few changes to `view_state` JSON RPC query:
- The requset has now an optional `include_proof` argument. When set to
- The request has now an optional `include_proof` argument. When set to
`true`, response’s `proof` will be populated.
- The `proof` within each value in `values` list of a `view_state` response is
now deprecated and will be removed in the future. Client code should ignore
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Setting defaul values to enable `not null` by filling values for existing
-- Setting default values to enable `not null` by filling values for existing
-- rows.
alter table ft_transfers
add column initiator text not null default 'crt-benchmarks',
Expand Down
5 changes: 2 additions & 3 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,6 @@ impl Chain {
tracing::debug!(target: "chain", ?shard_id, need_storage_update, "Updating storage");

if need_storage_update {
// TODO(resharding): consider adding to catchup flow.
self.resharding_manager.start_resharding(
self.chain_store.store_update(),
&block,
Expand Down Expand Up @@ -3155,8 +3154,8 @@ impl Chain {
}
blocks_catch_up_state.done_blocks.push(queued_block);
}
Err(_) => {
error!("Error processing block during catch up, retrying");
Err(err) => {
error!(target: "chain", ?err, "Error processing block during catch up, retrying");
blocks_catch_up_state.pending_blocks.push(queued_block);
}
}
Expand Down
10 changes: 7 additions & 3 deletions chain/chain/src/resharding/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use near_primitives::challenge::PartialState;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout};
use near_primitives::types::chunk_extra::ChunkExtra;
use near_store::adapter::trie_store::get_shard_uid_mapping;
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::flat::BlockInfo;
use near_store::trie::mem::mem_trie_update::TrackingMode;
Expand Down Expand Up @@ -124,7 +125,7 @@ impl ReshardingManager {
return Ok(());
}

// Reshard the State column by setting ShardUId mapping from children to parent.
// Reshard the State column by setting ShardUId mapping from children to ancestor.
self.set_state_shard_uid_mapping(&split_shard_event)?;

// Create temporary children memtries by freezing parent memtrie and referencing it.
Expand All @@ -146,16 +147,19 @@ impl ReshardingManager {
}

/// Store in the database the mapping of ShardUId from children to the parent shard,
/// so that subsequent accesses to the State will use the parent shard's UId as a prefix for the database key.
/// so that subsequent accesses to the State will use the ancestor's ShardUId prefix
/// as a prefix for the database key.
// TODO(resharding) add testloop where grandparent ShardUId is used
fn set_state_shard_uid_mapping(
&mut self,
split_shard_event: &ReshardingSplitShardParams,
) -> io::Result<()> {
let mut store_update = self.store.trie_store().store_update();
let parent_shard_uid = split_shard_event.parent_shard;
let parent_shard_uid_prefix = get_shard_uid_mapping(&self.store, parent_shard_uid);
// TODO(resharding) No need to set the mapping for children shards that we won't track just after resharding?
for child_shard_uid in split_shard_event.children_shards() {
store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid);
store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid_prefix);
}
store_update.commit()
}
Expand Down
3 changes: 2 additions & 1 deletion chain/chain/src/runtime/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl NightshadeRuntime {
runtime_config_store: Option<RuntimeConfigStore>,
trie_config: TrieConfig,
state_snapshot_type: StateSnapshotType,
gc_num_epochs_to_keep: u64,
) -> Arc<Self> {
Self::new(
store,
Expand All @@ -57,7 +58,7 @@ impl NightshadeRuntime {
None,
None,
runtime_config_store,
DEFAULT_GC_NUM_EPOCHS_TO_KEEP,
gc_num_epochs_to_keep,
trie_config,
StateSnapshotConfig {
state_snapshot_type,
Expand Down
13 changes: 9 additions & 4 deletions chain/client/src/sync/state/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use super::downloader::StateSyncDownloader;
use super::task_tracker::TaskTracker;
use crate::metrics;
use crate::sync::state::chain_requests::ChainFinalizationRequest;
use crate::sync::state::util::query_epoch_id_and_height_for_block;
use futures::{StreamExt, TryStreamExt};
use near_async::futures::{FutureSpawner, FutureSpawnerExt};
use near_async::messaging::AsyncSender;
Expand All @@ -15,6 +14,7 @@ use near_primitives::sharding::ShardChunk;
use near_primitives::state_part::PartId;
use near_primitives::state_sync::StatePartKey;
use near_primitives::types::{EpochId, ShardId};
use near_primitives::version::PROTOCOL_VERSION;
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::flat::{FlatStorageReadyStatus, FlatStorageStatus};
use near_store::{DBCol, ShardUId, Store};
Expand Down Expand Up @@ -162,8 +162,6 @@ pub(super) async fn run_state_sync_for_shard(
return_if_cancelled!(cancel);
// Create flat storage.
{
let (epoch_id, _) = query_epoch_id_and_height_for_block(&store, sync_hash)?;
let shard_uid = epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;
let chunk = header.cloned_chunk();
let block_hash = chunk.prev_block();

Expand All @@ -180,8 +178,15 @@ pub(super) async fn run_state_sync_for_shard(
// Load memtrie.
{
let handle = computation_task_tracker.get_handle(&format!("shard {}", shard_id)).await;
let head_protocol_version = epoch_manager.get_epoch_protocol_version(&epoch_id)?;
let shard_uids_pending_resharding = epoch_manager
.get_shard_uids_pending_resharding(head_protocol_version, PROTOCOL_VERSION)?;
handle.set_status("Loading memtrie");
runtime.get_tries().load_mem_trie_on_catchup(&shard_uid, &state_root)?;
runtime.get_tries().load_mem_trie_on_catchup(
&shard_uid,
&state_root,
&shard_uids_pending_resharding,
)?;
}

return_if_cancelled!(cancel);
Expand Down
4 changes: 3 additions & 1 deletion chain/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::mpsc;

use near_chain_configs::GenesisValidationMode;
pub use near_primitives;
use near_primitives::types::Gas;
use near_primitives::types::{Finality, Gas};
pub use nearcore::{get_default_home, init_configs, NearConfig};

pub use near_indexer_primitives::{
Expand Down Expand Up @@ -83,6 +83,8 @@ pub struct IndexerConfig {
pub sync_mode: SyncModeEnum,
/// Whether await for node to be synced or not
pub await_for_node_synced: AwaitForNodeSyncedEnum,
/// Finality level at which blocks are streamed
pub finality: Finality,
/// Tells whether to validate the genesis file before starting
pub validate_genesis: bool,
}
Expand Down
3 changes: 2 additions & 1 deletion chain/indexer/src/streamer/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ pub(crate) async fn fetch_status(
/// entire block or we already fetched this block.
pub(crate) async fn fetch_latest_block(
client: &Addr<near_client::ViewClientActor>,
finality: &near_primitives::types::Finality,
) -> Result<views::BlockView, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching latest block");
client
.send(
near_client::GetBlock(near_primitives::types::BlockReference::Finality(
near_primitives::types::Finality::Final,
finality.clone(),
))
.with_span_context(),
)
Expand Down
13 changes: 7 additions & 6 deletions chain/indexer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static DELAYED_LOCAL_RECEIPTS_CACHE: std::sync::LazyLock<
Arc<RwLock<HashMap<CryptoHash, views::ReceiptView>>>,
> = std::sync::LazyLock::new(|| Arc::new(RwLock::new(HashMap::new())));

const INTERVAL: Duration = Duration::from_millis(500);
const INTERVAL: Duration = Duration::from_millis(250);

/// Blocks #47317863 and #47317864 with restored receipts.
const PROBLEMATIC_BLOCKS: [CryptoHash; 2] = [
Expand Down Expand Up @@ -413,11 +413,12 @@ pub(crate) async fn start(
AwaitForNodeSyncedEnum::StreamWhileSyncing => {}
};

let block = if let Ok(block) = fetch_latest_block(&view_client).await {
block
} else {
continue;
};
let block =
if let Ok(block) = fetch_latest_block(&view_client, &indexer_config.finality).await {
block
} else {
continue;
};

let latest_block_height = block.header.height;
let start_syncing_block_height = if let Some(last_synced_block_height) =
Expand Down
6 changes: 5 additions & 1 deletion core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,16 @@ impl ShardTries {

/// Loads in-memory trie upon catchup, if it is enabled.
/// Requires state root because `ChunkExtra` is not available at the time mem-trie is being loaded.
/// Mem-tries of shards that are pending resharding must be loaded in any case.
pub fn load_mem_trie_on_catchup(
&self,
shard_uid: &ShardUId,
state_root: &StateRoot,
shard_uids_pending_resharding: &HashSet<ShardUId>,
) -> Result<(), StorageError> {
if !self.0.trie_config.load_mem_tries_for_tracked_shards {
if !self.0.trie_config.load_mem_tries_for_tracked_shards
&& !shard_uids_pending_resharding.contains(shard_uid)
{
return Ok(());
}
// It should not happen that memtrie is already loaded for a shard
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ impl TestLoopBuilder {
self.runtime_config_store.clone(),
TrieConfig::from_store_config(&store_config),
StateSnapshotType::EveryEpoch,
client_config.gc.gc_num_epochs_to_keep,
);

let state_snapshot = StateSnapshotActor::new(
Expand Down Expand Up @@ -671,6 +672,7 @@ impl TestLoopBuilder {
self.runtime_config_store.clone(),
TrieConfig::from_store_config(&store_config),
StateSnapshotType::EveryEpoch,
client_config.gc.gc_num_epochs_to_keep,
);
(view_epoch_manager, view_shard_tracker, view_runtime_adapter)
} else {
Expand Down
Loading

0 comments on commit 6791325

Please sign in to comment.