Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(resharding) - congestion info computation #12581

Merged
merged 28 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,10 @@ fn shard_split_handle_key_value(
| col::BANDWIDTH_SCHEDULER_STATE => {
copy_kv_to_all_children(&split_params, key, value, store_update)
}
col::BUFFERED_RECEIPT_INDICES | col::BUFFERED_RECEIPT => {
col::BUFFERED_RECEIPT_INDICES
| col::BUFFERED_RECEIPT
| col::BUFFERED_RECEIPT_GROUPS_QUEUE_DATA
| col::BUFFERED_RECEIPT_GROUPS_QUEUE_ITEM => {
copy_kv_to_left_child(&split_params, key, value, store_update)
}
_ => unreachable!("key: {:?} should not appear in flat store!", key),
Expand Down
140 changes: 133 additions & 7 deletions chain/chain/src/resharding/manager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cell::RefCell;
use std::io;
use std::sync::Arc;

Expand All @@ -6,20 +7,23 @@ use super::types::ReshardingSender;
use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageResharderController};
use crate::types::RuntimeAdapter;
use crate::ChainStoreUpdate;
use itertools::Itertools;
use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle};
use near_chain_primitives::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::block::Block;
use near_primitives::challenge::PartialState;
use near_primitives::congestion_info::CongestionInfo;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout};
use near_primitives::types::chunk_extra::ChunkExtra;
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::flat::BlockInfo;
use near_store::trie::mem::mem_trie_update::TrackingMode;
use near_store::trie::ops::resharding::RetainMode;
use near_store::trie::outgoing_metadata::ReceiptGroupsQueue;
use near_store::trie::TrieRecorder;
use near_store::{DBCol, ShardTries, ShardUId, Store};
use near_store::{DBCol, ShardTries, ShardUId, Store, TrieAccess};

pub struct ReshardingManager {
store: Store,
Expand Down Expand Up @@ -183,7 +187,7 @@ impl ReshardingManager {
// blocks, the second finalization will crash.
tries.freeze_mem_tries(parent_shard_uid, split_shard_event.children_shards())?;

let chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?;
let parent_chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?;
let boundary_account = split_shard_event.boundary_account;

let mut trie_store_update = self.store.store_update();
Expand All @@ -210,20 +214,48 @@ impl ReshardingManager {
let mut mem_tries = mem_tries.write().unwrap();
let mut trie_recorder = TrieRecorder::new();
let mode = TrackingMode::RefcountsAndAccesses(&mut trie_recorder);
let mem_trie_update = mem_tries.update(*chunk_extra.state_root(), mode)?;
let mem_trie_update = mem_tries.update(*parent_chunk_extra.state_root(), mode)?;

let trie_changes = mem_trie_update.retain_split_shard(&boundary_account, retain_mode);
let partial_storage = trie_recorder.recorded_storage();
let mem_changes = trie_changes.mem_trie_changes.as_ref().unwrap();
let new_state_root = mem_tries.apply_memtrie_changes(block_height, mem_changes);
drop(mem_tries);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is annoying but needed to prevent a deadlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds a bit scary 👀, but I assume it has to be this way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit ugly but I don't think it's scary. Rust is smart enough to know not to reuse this variable anymore. Unless you had some other risk in mind?


// Get the congestion info for the child.
let parent_epoch_id = block.header().epoch_id();
let parent_shard_layout = self.epoch_manager.get_shard_layout(&parent_epoch_id)?;
let parent_state_root = *parent_chunk_extra.state_root();
let parent_trie = tries.get_trie_for_shard(parent_shard_uid, parent_state_root);
let parent_congestion_info =
parent_chunk_extra.congestion_info().expect("The congestion info must exist!");

let trie_recorder = RefCell::new(trie_recorder);
let parent_trie = parent_trie.recording_reads_with_recorder(trie_recorder);

let child_epoch_id = self.epoch_manager.get_next_epoch_id(block.hash())?;
let child_shard_layout = self.epoch_manager.get_shard_layout(&child_epoch_id)?;
let child_congestion_info = Self::get_child_congestion_info(
&parent_trie,
&parent_shard_layout,
parent_congestion_info,
&child_shard_layout,
new_shard_uid,
retain_mode,
)?;

let trie_recorder = parent_trie.take_recorder().unwrap();
let partial_storage = trie_recorder.borrow_mut().recorded_storage();
let partial_state_len = match &partial_storage.nodes {
PartialState::TrieValues(values) => values.len(),
};
let mem_changes = trie_changes.mem_trie_changes.as_ref().unwrap();
let new_state_root = mem_tries.apply_memtrie_changes(block_height, mem_changes);

// TODO(resharding): set all fields of `ChunkExtra`. Consider stronger
// typing. Clarify where it should happen when `State` and
// `FlatState` update is implemented.
let mut child_chunk_extra = ChunkExtra::clone(&chunk_extra);
let mut child_chunk_extra = ChunkExtra::clone(&parent_chunk_extra);
*child_chunk_extra.state_root_mut() = new_state_root;
*child_chunk_extra.congestion_info_mut().expect("The congestion info must exist!") =
child_congestion_info;

chain_store_update.save_chunk_extra(block_hash, &new_shard_uid, child_chunk_extra);
chain_store_update.save_state_transition_data(
Expand Down Expand Up @@ -256,6 +288,100 @@ impl ReshardingManager {
Ok(())
}

pub fn get_child_congestion_info(
parent_trie: &dyn TrieAccess,
parent_shard_layout: &ShardLayout,
parent_congestion_info: CongestionInfo,
child_shard_layout: &ShardLayout,
child_shard_uid: ShardUId,
retain_mode: RetainMode,
) -> Result<CongestionInfo, Error> {
// Get the congestion info based on the parent shard.
let mut child_congestion_info = Self::get_child_congestion_info_not_finalized(
parent_trie,
&parent_shard_layout,
parent_congestion_info,
retain_mode,
)?;

// Set the allowed shard based on the child shard.
Self::finalize_allowed_shard(
&child_shard_layout,
child_shard_uid,
&mut child_congestion_info,
)?;

Ok(child_congestion_info)
}

// Get the congestion info for the child shard. The congestion info can be
// inferred efficiently from the combination of the parent shard's
// congestion info and the receipt group metadata, that is available in the
// parent shard's trie.
fn get_child_congestion_info_not_finalized(
parent_trie: &dyn TrieAccess,
parent_shard_layout: &ShardLayout,
parent_congestion_info: CongestionInfo,
retain_mode: RetainMode,
) -> Result<CongestionInfo, Error> {
// The left child contains all the delayed and buffered receipts from the
// parent so it should have identical congestion info.
if retain_mode == RetainMode::Left {
return Ok(parent_congestion_info);
}

// The right child contains all the delayed receipts from the parent but it
// has no buffered receipts. It's info needs to be computed by subtracting
// the parent's buffered receipts from the parent's congestion info.
let mut congestion_info = parent_congestion_info;
for shard_id in parent_shard_layout.shard_ids() {
let receipt_groups = ReceiptGroupsQueue::load(parent_trie, shard_id)?;
Copy link
Contributor

@jancionear jancionear Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there's some way to protect against a scenario where the ReceiptGroupsQueue isn't fully initialized. The old receipts will most likely be forwarded within one epoch, but in theory there could be still be some receipts not included in the metadata when the resharding happens.
Could we detect that and postpone the protocol upgrade or something? Idk, sounds complicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's borderline impossible that we wouldn't go through the queue for a full epoch. We do have some assertions in place to check that congestion info is zeroed in the right places iff the queue is empty so at least we'll detect the issue some time later.

I don't think delaying is feasible as resharding is scheduled an epoch in advance and we only know about this condition at the last moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it'd be possible to bootstrap congestion info? Idk, not ideal either

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, cool idea, it could work but if we're swamped with receipts that could blow up the state witness. I still don't think it's necessary to worry about this case.

let Some(receipt_groups) = receipt_groups else {
continue;
};

let bytes = receipt_groups.total_size();
let gas = receipt_groups.total_gas();

congestion_info
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not have to set total_receipts_num as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good sanity check! The total_receipts_num field is part of ReceiptGroupsQueueData which is resharded by copying to left child only. It is handled by the memtrie and flat storage split operations. MemTrie is already implemented here and this PR adds the flat storage equivalent.

.remove_buffered_receipt_gas(gas)
.expect("Buffered gas must not exceed congestion info buffered gas");
congestion_info
.remove_receipt_bytes(bytes)
.expect("Buffered size must not exceed congestion info buffered size");
}

// The right child does not inherit any buffered receipts. The
// congestion info must match this invariant.
assert_eq!(congestion_info.buffered_receipts_gas(), 0);

Ok(congestion_info)
}

fn finalize_allowed_shard(
child_shard_layout: &ShardLayout,
child_shard_uid: ShardUId,
congestion_info: &mut CongestionInfo,
) -> Result<(), Error> {
let all_shards = child_shard_layout.shard_ids().collect_vec();
let own_shard = child_shard_uid.shard_id();
let own_shard_index = child_shard_layout
.get_shard_index(own_shard)?
.try_into()
.expect("ShardIndex must fit in u64");
// Please note that the congestion seed used during resharding is
// different than the one used during normal operation. In runtime the
// seed is set to the sum of shard index and block height. The block
// height isn't easily available on all call sites which is why the
// simplified seed is used. This is valid because it's deterministic and
// resharding is a very rare event. However in a perfect world it should
// be the same.
// TODO - Use proper congestion control seed during resharding.
let congestion_seed = own_shard_index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to use the same seed as usual (derived from block height), but I guess it doesn't matter, as long as it's deterministic and only on resharding boundaries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the issue was that I couldn't get hold of the block height easily on the stateless validation path. Let me add a TODO but for now as you said, as long as it's deterministic and one off it's fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question, doesn't this mess up the coordination of allowed shard with respect to other shards?
Example, suppose we had 5 shards and a round robin method to assign allowed_shard. Say right before resharding we had the mapping
Shard 0 -> Shard 3
Shard 1 -> Shard 4
Shard 2 -> Shard 0
Shard 3 -> Shard 1
Shard 4 -> Shard 2

After resharding we split Shard 4 to Shard 5 and Shard 6 (index 4 and 5). If we break the coordination and just assign allowed shard based on congestion_seed = own_shard_index, wouldn't that potentially lead to two shards having the same mapped allowed_shard?

Or is this issue just for the one block after resharding and life returns to normal after this block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There shouldn't be any duplicated allowed shards but please don't ask about missing chunks.

If I'm reading the code right, using just the shard index means that each shard will have itself as the allowed shard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yeah it's only for one block during resharding.

congestion_info.finalize_allowed_shard(own_shard, &all_shards, congestion_seed);
Ok(())
}

// TODO(store): Use proper store interface
fn get_chunk_extra(
&self,
Expand Down
6 changes: 3 additions & 3 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ impl RuntimeAdapter for NightshadeRuntime {
if ProtocolFeature::StatelessValidation.enabled(next_protocol_version)
|| cfg!(feature = "shadow_chunk_validation")
{
trie = trie.recording_reads();
trie = trie.recording_reads_new_recorder();
}
let mut state_update = TrieUpdate::new(trie);

Expand Down Expand Up @@ -799,7 +799,7 @@ impl RuntimeAdapter for NightshadeRuntime {
}
}
}
debug!(target: "runtime", "Transaction filtering results {} valid out of {} pulled from the pool", result.transactions.len(), num_checked_transactions);
debug!(target: "runtime", limited_by=?result.limited_by, "Transaction filtering results {} valid out of {} pulled from the pool", result.transactions.len(), num_checked_transactions);
let shard_label = shard_id.to_string();
metrics::PREPARE_TX_SIZE.with_label_values(&[&shard_label]).observe(total_size as f64);
metrics::PREPARE_TX_REJECTED
Expand Down Expand Up @@ -882,7 +882,7 @@ impl RuntimeAdapter for NightshadeRuntime {
if ProtocolFeature::StatelessValidation.enabled(next_protocol_version)
|| cfg!(feature = "shadow_chunk_validation")
{
trie = trie.recording_reads();
trie = trie.recording_reads_new_recorder();
}

match self.process_state_update(
Expand Down
42 changes: 33 additions & 9 deletions chain/chain/src/stateless_validation/chunk_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::chain::{
};
use crate::rayon_spawner::RayonAsyncComputationSpawner;
use crate::resharding::event_type::ReshardingEventType;
use crate::resharding::manager::ReshardingManager;
use crate::sharding::shuffle_receipt_proofs;
use crate::stateless_validation::processing_tracker::ProcessingDoneTracker;
use crate::store::filter_incoming_receipts_for_shard;
Expand Down Expand Up @@ -96,6 +97,7 @@ pub fn validate_prepared_transactions(
) -> Result<PreparedTransactions, Error> {
let parent_block = chain.chain_store().get_block(chunk_header.prev_block_hash())?;
let last_chunk_transactions_size = borsh::to_vec(last_chunk_transactions)?.len();
tracing::info!("boom prepare_transactions from validator");
runtime_adapter.prepare_transactions(
storage_config,
crate::types::PrepareTransactionsChunkContext {
Expand Down Expand Up @@ -690,7 +692,7 @@ pub fn validate_chunk_state_witness(
.into_iter()
.zip(state_witness.implicit_transitions.into_iter())
{
let (shard_uid, new_state_root) = match implicit_transition_params {
let (shard_uid, new_state_root, new_congestion_info) = match implicit_transition_params {
ImplicitTransitionParams::ApplyOldChunk(block, shard_uid) => {
let shard_context = ShardContext { shard_uid, should_apply_chunk: false };
let old_chunk_data = OldChunkData {
Expand All @@ -710,25 +712,47 @@ pub fn validate_chunk_state_witness(
shard_context,
runtime_adapter,
)?;
(shard_uid, apply_result.new_root)
let congestion_info =
chunk_extra.congestion_info().expect("The congestion info must exist!");
(shard_uid, apply_result.new_root, congestion_info)
}
ImplicitTransitionParams::Resharding(
boundary_account,
retain_mode,
child_shard_uid,
) => {
let old_root = *chunk_extra.state_root();
let trie = Trie::from_recorded_storage(
PartialStorage { nodes: transition.base_state },
old_root,
true,
);
let new_root = trie.retain_split_shard(&boundary_account, retain_mode)?;
(child_shard_uid, new_root)
let partial_storage = PartialStorage { nodes: transition.base_state };
let parent_trie = Trie::from_recorded_storage(partial_storage, old_root, true);

// Update the congestion info based on the parent shard. It's
// important to do this step before the `retain_split_shard`
// because only the parent trie has the needed information.
let epoch_id = epoch_manager.get_epoch_id(&block_hash)?;
let parent_shard_layout = epoch_manager.get_shard_layout(&epoch_id)?;
let parent_congestion_info =
chunk_extra.congestion_info().expect("The congestion info must exist");

let child_epoch_id = epoch_manager.get_next_epoch_id(&block_hash)?;
let child_shard_layout = epoch_manager.get_shard_layout(&child_epoch_id)?;
let child_congestion_info = ReshardingManager::get_child_congestion_info(
&parent_trie,
&parent_shard_layout,
parent_congestion_info,
&child_shard_layout,
child_shard_uid,
retain_mode,
)?;

let new_root = parent_trie.retain_split_shard(&boundary_account, retain_mode)?;

(child_shard_uid, new_root, child_congestion_info)
}
};

*chunk_extra.state_root_mut() = new_state_root;
*chunk_extra.congestion_info_mut().expect("The congestion info must exist!") =
new_congestion_info;
if chunk_extra.state_root() != &transition.post_state_root {
// This is an early check, it's not for correctness, only for better
// error reporting in case of an invalid state witness due to a bug.
Expand Down
4 changes: 3 additions & 1 deletion chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,7 @@ impl Client {
} else {
0
};
tracing::info!("boom prepare_transactions from client");
runtime.prepare_transactions(
storage_config,
PrepareTransactionsChunkContext {
Expand Down Expand Up @@ -2255,7 +2256,8 @@ impl Client {
validators.remove(account_id);
}
for validator in validators {
trace!(target: "client", me = ?signer.as_ref().map(|bp| bp.validator_id()), ?tx, ?validator, ?shard_id, "Routing a transaction");
let tx_hash = tx.get_hash();
trace!(target: "client", me = ?signer.as_ref().map(|bp| bp.validator_id()), ?tx_hash, ?validator, ?shard_id, "Routing a transaction");

// Send message to network to actually forward transaction.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
Expand Down
29 changes: 25 additions & 4 deletions core/primitives-core/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,18 @@ impl ProtocolFeature {
// TODO(#11201): When stabilizing this feature in mainnet, also remove the temporary code
// that always enables this for mocknet (see config_mocknet function).
ProtocolFeature::ShuffleShardAssignments => 143,
// CurrentEpochStateSync must be enabled before ReshardingV3! When
// releasing this feature please make sure to schedule separate
// protocol upgrades for those features!
ProtocolFeature::CurrentEpochStateSync => 144,
ProtocolFeature::SimpleNightshadeV4 => 145,
// BandwidthScheduler must be enabled before ReshardingV3! When
// releasing this feature please make sure to schedule separate
// protocol upgrades for those features!
ProtocolFeature::BandwidthScheduler => 145,
ProtocolFeature::SimpleNightshadeV4 => 146,
#[cfg(feature = "protocol_feature_relaxed_chunk_validation")]
ProtocolFeature::RelaxedChunkValidation => 146,
ProtocolFeature::ExcludeExistingCodeFromWitnessForCodeLen => 147,
ProtocolFeature::BandwidthScheduler => 148,
ProtocolFeature::RelaxedChunkValidation => 147,
ProtocolFeature::ExcludeExistingCodeFromWitnessForCodeLen => 148,
ProtocolFeature::BlockHeightForReceiptId => 149,
// Place features that are not yet in Nightly below this line.
}
Expand Down Expand Up @@ -345,3 +351,18 @@ macro_rules! checked_feature {
}
}};
}

#[cfg(test)]
mod tests {
use super::ProtocolFeature;

#[test]
fn test_resharding_dependencies() {
let state_sync = ProtocolFeature::CurrentEpochStateSync.protocol_version();
let bandwidth_scheduler = ProtocolFeature::BandwidthScheduler.protocol_version();
let resharding_v3 = ProtocolFeature::SimpleNightshadeV4.protocol_version();

assert!(state_sync < resharding_v3);
assert!(bandwidth_scheduler < resharding_v3);
}
}
Loading
Loading