Skip to content

Commit

Permalink
test(resharding): base resharding scenario state cleanup (#12678)
Browse files Browse the repository at this point in the history
A test for cleanup after resharding.
Adds an extra client without a role, that initially tracks the parent
shard, then a child shard, and then an unrelated shard.
After a few epochs, only the last tracked shard should reside in State,
while currently the test fails because it has entries from the first
tracked shard too.

**Changes**
- Fix a bug in test infra: pass `gc_num_epochs_to_keep` to the runtime.
- Add `tracked_shard_schedule` option to resharding testloop, in order
to test the cleanup.
- Add the cleanup test itself (ignored for now).
  • Loading branch information
staffik authored Jan 3, 2025
1 parent d95855a commit 77b4ab2
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 16 deletions.
3 changes: 2 additions & 1 deletion chain/chain/src/runtime/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl NightshadeRuntime {
runtime_config_store: Option<RuntimeConfigStore>,
trie_config: TrieConfig,
state_snapshot_type: StateSnapshotType,
gc_num_epochs_to_keep: u64,
) -> Arc<Self> {
Self::new(
store,
Expand All @@ -57,7 +58,7 @@ impl NightshadeRuntime {
None,
None,
runtime_config_store,
DEFAULT_GC_NUM_EPOCHS_TO_KEEP,
gc_num_epochs_to_keep,
trie_config,
StateSnapshotConfig {
state_snapshot_type,
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ impl TestLoopBuilder {
self.runtime_config_store.clone(),
TrieConfig::from_store_config(&store_config),
StateSnapshotType::EveryEpoch,
client_config.gc.gc_num_epochs_to_keep,
);

let state_snapshot = StateSnapshotActor::new(
Expand Down Expand Up @@ -671,6 +672,7 @@ impl TestLoopBuilder {
self.runtime_config_store.clone(),
TrieConfig::from_store_config(&store_config),
StateSnapshotType::EveryEpoch,
client_config.gc.gc_num_epochs_to_keep,
);
(view_epoch_manager, view_shard_tracker, view_runtime_adapter)
} else {
Expand Down
122 changes: 109 additions & 13 deletions integration-tests/src/test_loop/tests/resharding_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::test_loop::utils::receipts::{
#[cfg(feature = "test_features")]
use crate::test_loop::utils::resharding::fork_before_resharding_block;
use crate::test_loop::utils::resharding::{
call_burn_gas_contract, call_promise_yield, execute_money_transfers,
temporary_account_during_resharding,
call_burn_gas_contract, call_promise_yield, check_state_cleanup_after_resharding,
execute_money_transfers, temporary_account_during_resharding, TrackedShardSchedule,
};
use crate::test_loop::utils::sharding::print_and_assert_shard_accounts;
use crate::test_loop::utils::transactions::{
Expand All @@ -45,6 +45,18 @@ const DEFAULT_EPOCH_LENGTH: u64 = 6;
/// and later we would hit the `DBNotFoundErr("Transaction ...)` error in tests.
const INCREASED_EPOCH_LENGTH: u64 = 8;

/// Garbage collection window length.
const GC_NUM_EPOCHS_TO_KEEP: u64 = 3;

/// Maximum number of epochs under which the test should finish.
const TESTLOOP_NUM_EPOCHS_TO_WAIT: u64 = 8;

/// Default shard layout version used in resharding tests.
const DEFAULT_SHARD_LAYOUT_VERSION: u64 = 2;

/// Account used in resharding tests as a split boundary.
const NEW_BOUNDARY_ACCOUNT: &str = "account6";

#[derive(derive_builder::Builder)]
#[builder(pattern = "owned", build_fn(skip))]
#[allow(unused)]
Expand Down Expand Up @@ -84,6 +96,12 @@ struct TestReshardingParameters {
chunk_ranges_to_drop: HashMap<ShardIndex, std::ops::Range<i64>>,
shuffle_shard_assignment_for_chunk_producers: bool,
track_all_shards: bool,
// Manually specify what shards will be tracked for a given client ID.
// The client ID must not be used for any other role (validator, RPC, etc.).
// The schedule length must be more than `TESTLOOP_NUM_EPOCHS_TO_WAIT` so that it covers all epoch heights used in the test.
// The suffix must consist of `GC_NUM_EPOCHS_TO_KEEP` repetitions of the same shard,
// so that we can assert at the end of the test that the state of all other shards have been cleaned up.
tracked_shard_schedule: Option<TrackedShardSchedule>,
load_mem_tries_for_tracked_shards: bool,
/// Custom behavior executed at every iteration of test loop.
#[builder(setter(custom))]
Expand Down Expand Up @@ -115,16 +133,23 @@ struct TestReshardingParameters {

impl TestReshardingParametersBuilder {
fn build(self) -> TestReshardingParameters {
// Give enough time for GC to kick in after resharding.
assert!(GC_NUM_EPOCHS_TO_KEEP + 2 < TESTLOOP_NUM_EPOCHS_TO_WAIT);
let epoch_length = self.epoch_length.unwrap_or(DEFAULT_EPOCH_LENGTH);
let tracked_shard_schedule = self.tracked_shard_schedule.unwrap_or(None);

let num_accounts = self.num_accounts.unwrap_or(8);
let num_clients = self.num_clients.unwrap_or(7);
let num_producers = self.num_producers.unwrap_or(3);
let num_validators = self.num_validators.unwrap_or(2);
let num_rpcs = self.num_rpcs.unwrap_or(1);
let num_archivals = self.num_archivals.unwrap_or(1);
let num_extra_nodes = if tracked_shard_schedule.is_some() { 1 } else { 0 };

assert!(num_clients >= num_producers + num_validators + num_rpcs + num_archivals);
assert!(
num_clients
>= num_producers + num_validators + num_rpcs + num_archivals + num_extra_nodes
);

// #12195 prevents number of BPs bigger than `epoch_length`.
assert!(num_producers > 0 && num_producers <= epoch_length);
Expand Down Expand Up @@ -157,20 +182,36 @@ impl TestReshardingParametersBuilder {
let validators = validators.to_vec();
let (rpcs, tmp) = tmp.split_at(num_rpcs as usize);
let rpcs = rpcs.to_vec();
let (archivals, _) = tmp.split_at(num_archivals as usize);
let (archivals, clients_without_role) = tmp.split_at(num_archivals as usize);
let archivals = archivals.to_vec();

if let Some(tracked_shard_schedule) = &tracked_shard_schedule {
assert!(clients_without_role.contains(&clients[tracked_shard_schedule.client_index]));
let schedule_length = tracked_shard_schedule.schedule.len();
assert!(schedule_length > TESTLOOP_NUM_EPOCHS_TO_WAIT as usize);
for i in
(TESTLOOP_NUM_EPOCHS_TO_WAIT - GC_NUM_EPOCHS_TO_KEEP - 1) as usize..schedule_length
{
assert_eq!(
tracked_shard_schedule.schedule[i - 1],
tracked_shard_schedule.schedule[i]
);
}
}

let client_index =
if rpcs.is_empty() { 0 } else { num_producers + num_validators } as usize;
let client_id = clients[client_index].clone();

println!("Clients setup:");
println!("Producers: {producers:?}");
println!("Validators: {validators:?}");
println!("Rpcs: {rpcs:?}, to serve requests we use client: {client_id}");
println!("Rpcs: {rpcs:?}");
println!("Archivals: {archivals:?}");
println!("To serve requests, we use client: {client_id}");
println!("Num extra nodes: {num_extra_nodes}");

let new_boundary_account: AccountId = "account6".parse().unwrap();
let new_boundary_account: AccountId = NEW_BOUNDARY_ACCOUNT.parse().unwrap();
let temporary_account_id: AccountId =
format!("{}.{}", new_boundary_account, new_boundary_account).parse().unwrap();
let mut loop_actions = self.loop_actions.unwrap_or_default();
Expand All @@ -186,7 +227,9 @@ impl TestReshardingParametersBuilder {
}

TestReshardingParameters {
base_shard_layout_version: self.base_shard_layout_version.unwrap_or(2),
base_shard_layout_version: self
.base_shard_layout_version
.unwrap_or(DEFAULT_SHARD_LAYOUT_VERSION),
num_accounts,
num_clients,
num_producers,
Expand All @@ -208,6 +251,7 @@ impl TestReshardingParametersBuilder {
.shuffle_shard_assignment_for_chunk_producers
.unwrap_or(false),
track_all_shards: self.track_all_shards.unwrap_or(false),
tracked_shard_schedule,
load_mem_tries_for_tracked_shards: self
.load_mem_tries_for_tracked_shards
.unwrap_or(true),
Expand Down Expand Up @@ -266,12 +310,20 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {

init_test_logger();
let mut builder = TestLoopBuilder::new();
let tracked_shard_schedule = params.tracked_shard_schedule.clone();

// Adjust the resharding configuration to make the tests faster.
builder = builder.config_modifier(|config, _| {
builder = builder.config_modifier(move |config, client_index| {
// Adjust the resharding configuration to make the tests faster.
let mut resharding_config = config.resharding_config.get();
resharding_config.batch_delay = Duration::milliseconds(1);
config.resharding_config.update(resharding_config);
// Set the tracked shard schedule if specified for the client at the given index.
if let Some(tracked_shard_schedule) = &tracked_shard_schedule {
if client_index == tracked_shard_schedule.client_index {
config.tracked_shards = vec![];
config.tracked_shard_schedule = tracked_shard_schedule.schedule.clone();
}
}
});

// Prepare shard split configuration.
Expand Down Expand Up @@ -356,6 +408,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
base_protocol_version + 1,
params.chunk_ranges_to_drop.clone(),
)
.gc_num_epochs_to_keep(GC_NUM_EPOCHS_TO_KEEP)
.build();

let mut test_setup_transactions = vec![];
Expand Down Expand Up @@ -403,7 +456,6 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
client_handles.iter().map(|handle| &env.test_loop.data.get(handle).client).collect_vec();
let mut trie_sanity_check =
TrieSanityCheck::new(&clients, params.load_mem_tries_for_tracked_shards);
let gc_num_epochs_to_keep = clients[client_index].config.gc.gc_num_epochs_to_keep;

let latest_block_height = Cell::new(0u64);
// Height of a block after resharding.
Expand Down Expand Up @@ -456,6 +508,8 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
// Just passed an epoch with increased number of shards.
new_layout_block_height.set(Some(latest_block_height.get()));
new_layout_epoch_height.set(Some(epoch_height));
// Assert that we will have a chance for gc to kick in before the test is over.
assert!(epoch_height + GC_NUM_EPOCHS_TO_KEEP < TESTLOOP_NUM_EPOCHS_TO_WAIT);
println!("State after resharding:");
print_and_assert_shard_accounts(&clients, &tip);
}
Expand All @@ -467,7 +521,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
);

// Return false if garbage collection window has not passed yet since resharding.
if epoch_height <= new_layout_epoch_height.get().unwrap() + gc_num_epochs_to_keep {
if epoch_height <= new_layout_epoch_height.get().unwrap() + GC_NUM_EPOCHS_TO_KEEP {
return false;
}
for loop_action in &params.loop_actions {
Expand All @@ -478,8 +532,8 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {

env.test_loop.run_until(
success_condition,
// Give enough time to produce ~7 epochs.
Duration::seconds((7 * params.epoch_length) as i64),
// Give enough time to produce ~TESTLOOP_NUM_EPOCHS_TO_WAIT epochs.
Duration::seconds((TESTLOOP_NUM_EPOCHS_TO_WAIT * params.epoch_length) as i64),
);
let client = &env.test_loop.data.get(&client_handles[client_index]).client;
trie_sanity_check.check_epochs(client);
Expand All @@ -492,6 +546,48 @@ fn test_resharding_v3() {
test_resharding_v3_base(TestReshardingParametersBuilder::default().build());
}

// Takes a sequence of shard ids to track in consecutive epochs,
// repeats the last element `TESTLOOP_NUM_EPOCHS_TO_WAIT` times,
// and maps each element: |id| -> vec![id], to the format required by `TrackedShardSchedule`.
fn shard_sequence_to_schedule(mut shard_sequence: Vec<ShardId>) -> Vec<Vec<ShardId>> {
shard_sequence.extend(
std::iter::repeat(*shard_sequence.last().unwrap())
.take(TESTLOOP_NUM_EPOCHS_TO_WAIT as usize),
);
shard_sequence.iter().map(|shard_id| vec![*shard_id]).collect()
}

#[test]
// TODO(resharding): fix nearcore and un-ignore this test
#[ignore]
fn test_resharding_v3_state_cleanup() {
// Track parent shard before resharding, child shard after resharding, and then an unrelated shard forever.
// Eventually, the State column should only contain entries belonging to the last tracked shard.
let account_in_stable_shard: AccountId = "account0".parse().unwrap();
let split_boundary_account: AccountId = NEW_BOUNDARY_ACCOUNT.parse().unwrap();
let base_shard_layout = get_base_shard_layout(DEFAULT_SHARD_LAYOUT_VERSION);
let new_shard_layout =
ShardLayout::derive_shard_layout(&base_shard_layout, split_boundary_account.clone());
let parent_shard_id = base_shard_layout.account_id_to_shard_id(&split_boundary_account);
let child_shard_id = new_shard_layout.account_id_to_shard_id(&split_boundary_account);
let unrelated_shard_id = new_shard_layout.account_id_to_shard_id(&account_in_stable_shard);

let tracked_shard_sequence =
vec![parent_shard_id, parent_shard_id, child_shard_id, unrelated_shard_id];
let num_clients = 8;
let tracked_shard_schedule = TrackedShardSchedule {
client_index: (num_clients - 1) as usize,
schedule: shard_sequence_to_schedule(tracked_shard_sequence),
};
test_resharding_v3_base(
TestReshardingParametersBuilder::default()
.num_clients(num_clients)
.tracked_shard_schedule(Some(tracked_shard_schedule.clone()))
.add_loop_action(check_state_cleanup_after_resharding(tracked_shard_schedule))
.build(),
);
}

#[test]
fn test_resharding_v3_track_all_shards() {
test_resharding_v3_base(
Expand Down
Loading

0 comments on commit 77b4ab2

Please sign in to comment.