Skip to content

Commit

Permalink
feat(state-dumper): rewrite the state dumper logic (#12492)
Browse files Browse the repository at this point in the history
The current state dumper code is sort of difficult to follow, and
doesn't make good use of the available cores to obtain and upload parts.
It starts one thread per shard that dumps one part on each iteration of
a big loop (that includes a good amount of unnecessary/redundant lookups
and calculations). So here we rewrite the logic so that instead of
starting one thread per shard and looping over part IDs like that, we
just figure out what parts need to be dumped when we see a new epoch,
and then spawn futures to obtain and upload the parts. Now the part
upload speed will be limited by the number of allowed "obtain part"
tasks (4), and the speed of generating those parts.

This has the advantage of not needing to change anything to work with
dynamic resharding, and the part upload is much faster. On a forknet run
with recent mainnet state, the old dumper takes around an hour and a
half to dump all the parts, and this version takes around half an hour
(could maybe be improved by tweaking/making configurable the number of
allowed tasks obtaining parts at a time)

This could be refactored further because there's still some leftover
structures from the previous implementation that don't fit super
cleanly, but this can be done in a future PR.
  • Loading branch information
marcelo-gonzalez authored Jan 7, 2025
1 parent 2d9de9b commit 1704c55
Show file tree
Hide file tree
Showing 9 changed files with 917 additions and 499 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 29 additions & 10 deletions chain/chain/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,16 +1025,35 @@ impl ChainStore {
key
}

/// Retrieves STATE_SYNC_DUMP for the given shard.
pub fn get_state_sync_dump_progress(
&self,
shard_id: ShardId,
) -> Result<StateSyncDumpProgress, Error> {
option_to_not_found(
self.store
.get_ser(DBCol::BlockMisc, &ChainStore::state_sync_dump_progress_key(shard_id)),
format!("STATE_SYNC_DUMP:{}", shard_id),
)
/// For each value stored, this returs an (EpochId, bool), where the bool tells whether it's finished
/// because those are the only fields we really care about.
pub fn iter_state_sync_dump_progress<'a>(
&'a self,
) -> impl Iterator<Item = io::Result<(ShardId, (EpochId, bool))>> + 'a {
self.store
.iter_prefix_ser::<StateSyncDumpProgress>(DBCol::BlockMisc, STATE_SYNC_DUMP_KEY)
.map(|item| {
item.and_then(|(key, progress)| {
// + 1 for the ':'
let prefix_len = STATE_SYNC_DUMP_KEY.len() + 1;
let int_part = &key[prefix_len..];
let int_part = int_part.try_into().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Bad StateSyncDump columnn key length: {}", key.len()),
)
})?;
let shard_id = ShardId::from_le_bytes(int_part);
Ok((
shard_id,
match progress {
StateSyncDumpProgress::AllDumped { epoch_id, .. } => (epoch_id, true),
StateSyncDumpProgress::InProgress { epoch_id, .. } => (epoch_id, false),
StateSyncDumpProgress::Skipped { epoch_id, .. } => (epoch_id, true),
},
))
})
})
}

/// Updates STATE_SYNC_DUMP for the given shard.
Expand Down
20 changes: 1 addition & 19 deletions chain/client/src/sync/state/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::task_tracker::TaskTracker;
use crate::metrics;
use crate::sync::state::chain_requests::ChainFinalizationRequest;
use futures::{StreamExt, TryStreamExt};
use near_async::futures::{FutureSpawner, FutureSpawnerExt};
use near_async::futures::{respawn_for_parallelism, FutureSpawner};
use near_async::messaging::AsyncSender;
use near_chain::types::RuntimeAdapter;
use near_chain::BlockHeader;
Expand Down Expand Up @@ -280,21 +280,3 @@ async fn apply_state_part(
)?;
Ok(())
}

/// Given a future, respawn it as an equivalent future but which does not block the
/// driver of the future. For example, if the given future directly performs
/// computation, normally the whoever drives the future (such as a buffered_unordered)
/// would be blocked by the computation, thereby not allowing computation of other
/// futures driven by the same driver to proceed. This function respawns the future
/// onto the FutureSpawner, so the driver of the returned future would not be blocked.
fn respawn_for_parallelism<T: Send + 'static>(
future_spawner: &dyn FutureSpawner,
name: &'static str,
f: impl std::future::Future<Output = T> + Send + 'static,
) -> impl std::future::Future<Output = T> + Send + 'static {
let (sender, receiver) = tokio::sync::oneshot::channel();
future_spawner.spawn(name, async move {
sender.send(f.await).ok();
});
async move { receiver.await.unwrap() }
}
18 changes: 18 additions & 0 deletions core/async/src/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,24 @@ impl FutureSpawnerExt for dyn FutureSpawner + '_ {
}
}

/// Given a future, respawn it as an equivalent future but which does not block the
/// driver of the future. For example, if the given future directly performs
/// computation, normally the whoever drives the future (such as a buffered_unordered)
/// would be blocked by the computation, thereby not allowing computation of other
/// futures driven by the same driver to proceed. This function respawns the future
/// onto the FutureSpawner, so the driver of the returned future would not be blocked.
pub fn respawn_for_parallelism<T: Send + 'static>(
future_spawner: &dyn FutureSpawner,
name: &'static str,
f: impl std::future::Future<Output = T> + Send + 'static,
) -> impl std::future::Future<Output = T> + Send + 'static {
let (sender, receiver) = tokio::sync::oneshot::channel();
future_spawner.spawn(name, async move {
sender.send(f.await).ok();
});
async move { receiver.await.unwrap() }
}

/// A FutureSpawner that hands over the future to Actix.
pub struct ActixFutureSpawner;

Expand Down
1 change: 1 addition & 0 deletions integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,7 @@ impl TestLoopBuilder {
future_spawner.spawn_boxed("state_sync_dumper", future);
Box::new(|| {})
}),
future_spawner: Arc::new(self.test_loop.future_spawner()),
handle: None,
};
let state_sync_dumper_handle = self.test_loop.data.register_data(state_sync_dumper);
Expand Down
3 changes: 3 additions & 0 deletions integration-tests/src/tests/client/state_dump.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use assert_matches::assert_matches;

use near_async::futures::ActixFutureSpawner;
use near_async::time::{Clock, Duration};
use near_chain::near_chain_primitives::error::QueryError;
use near_chain::{ChainGenesis, ChainStoreAccess, Provenance};
Expand Down Expand Up @@ -66,6 +67,7 @@ fn slow_test_state_dump() {
runtime,
validator,
dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(),
future_spawner: Arc::new(ActixFutureSpawner),
handle: None,
};
state_sync_dumper.start().unwrap();
Expand Down Expand Up @@ -171,6 +173,7 @@ fn run_state_sync_with_dumped_parts(
runtime,
validator,
dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(),
future_spawner: Arc::new(ActixFutureSpawner),
handle: None,
};
state_sync_dumper.start().unwrap();
Expand Down
1 change: 1 addition & 0 deletions nearcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ strum.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tracing.workspace = true
xz2.workspace = true

Expand Down
4 changes: 3 additions & 1 deletion nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ pub fn start_with_config_and_synchronization(
let state_sync_runtime =
Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap());

let state_sync_spawner = Arc::new(TokioRuntimeFutureSpawner(state_sync_runtime.clone()));
let StartClientResult { client_actor, client_arbiter_handle, resharding_handle } = start_client(
Clock::real(),
config.client_config.clone(),
Expand All @@ -397,7 +398,7 @@ pub fn start_with_config_and_synchronization(
shard_tracker.clone(),
runtime.clone(),
node_id,
Arc::new(TokioRuntimeFutureSpawner(state_sync_runtime.clone())),
state_sync_spawner.clone(),
network_adapter.as_multi_sender(),
shards_manager_adapter.as_sender(),
config.validator_signer.clone(),
Expand Down Expand Up @@ -434,6 +435,7 @@ pub fn start_with_config_and_synchronization(
runtime,
validator: config.validator_signer.clone(),
dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(),
future_spawner: state_sync_spawner,
handle: None,
};
state_sync_dumper.start()?;
Expand Down
Loading

0 comments on commit 1704c55

Please sign in to comment.