Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support lazy-initialization for all services #1454

Merged
merged 5 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,38 +44,29 @@ const TOPICS: &[u8] = &[
const ACCEPT_BLOCK_TIMEOUT_SEC: Duration = Duration::from_secs(20);
const HEARTBEAT_SEC: Duration = Duration::from_secs(1);

pub struct ChainSrv {
pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
/// Inbound wire messages queue
inbound: AsyncQueue<Message>,
keys_path: String,
acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
}

#[async_trait]
impl<N: Network, DB: database::DB, VM: vm::VMExecution>
LongLivedService<N, DB, VM> for ChainSrv
LongLivedService<N, DB, VM> for ChainSrv<N, DB, VM>
{
async fn execute(
async fn initialize(
&mut self,
network: Arc<RwLock<N>>,
db: Arc<RwLock<DB>>,
vm: Arc<RwLock<VM>>,
) -> anyhow::Result<usize> {
// Register routes
LongLivedService::<N, DB, VM>::add_routes(
self,
TOPICS,
self.inbound.clone(),
&network,
)
.await?;

// Restore/Load most recent block
) -> anyhow::Result<()> {
let mrb = Self::load_most_recent_block(db.clone(), vm.clone()).await?;

let state_hash = mrb.inner().header().state_hash;
let provisioners_list = vm.read().await.get_provisioners(state_hash)?;

// Initialize Acceptor and trigger consensus task
// Initialize Acceptor
let acc = Acceptor::init_consensus(
&self.keys_path,
mrb,
Expand All @@ -85,7 +76,29 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
vm.clone(),
)
.await?;
let acc = Arc::new(RwLock::new(acc));

self.acceptor = Some(Arc::new(RwLock::new(acc)));

Ok(())
}

async fn execute(
&mut self,
network: Arc<RwLock<N>>,
_db: Arc<RwLock<DB>>,
_vm: Arc<RwLock<VM>>,
) -> anyhow::Result<usize> {
// Register routes
LongLivedService::<N, DB, VM>::add_routes(
self,
TOPICS,
self.inbound.clone(),
&network,
)
.await?;

let acc = self.acceptor.as_mut().expect("initialize is called");
acc.write().await.spawn_task().await;

// Start-up FSM instance
let mut fsm = SimpleFSM::new(acc.clone(), network.clone());
Expand Down Expand Up @@ -200,11 +213,12 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
}
}

impl ChainSrv {
impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
pub fn new(keys_path: String) -> Self {
Self {
inbound: Default::default(),
keys_path,
acceptor: None,
}
}

Expand All @@ -213,7 +227,7 @@ impl ChainSrv {
/// Panics
///
/// If register entry is read but block is not found.
async fn load_most_recent_block<DB: database::DB, VM: vm::VMExecution>(
async fn load_most_recent_block(
db: Arc<RwLock<DB>>,
vm: Arc<RwLock<VM>>,
) -> Result<BlockWithLabel> {
Expand Down
3 changes: 1 addition & 2 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,10 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
acc.try_revert(RevertTarget::LastFinalizedState).await?;
}

acc.spawn_task().await;
Ok(acc)
}

async fn spawn_task(&self) {
pub async fn spawn_task(&self) {
let provisioners_list = self.provisioners_list.read().await.clone();
let base_timeouts = self.adjust_round_base_timeouts().await;

Expand Down
9 changes: 9 additions & 0 deletions node/src/databroker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ impl DataBrokerSrv {
impl<N: Network, DB: database::DB, VM: vm::VMExecution>
LongLivedService<N, DB, VM> for DataBrokerSrv
{
async fn initialize(
&mut self,
_network: Arc<RwLock<N>>,
_db: Arc<RwLock<DB>>,
_vm: Arc<RwLock<VM>>,
) -> anyhow::Result<()> {
Ok(())
}

async fn execute(
&mut self,
network: Arc<RwLock<N>>,
Expand Down
26 changes: 26 additions & 0 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ pub trait Network: Send + Sync + 'static {
pub trait LongLivedService<N: Network, DB: database::DB, VM: vm::VMExecution>:
Send + Sync
{
async fn initialize(
&mut self,
network: Arc<RwLock<N>>,
database: Arc<RwLock<DB>>,
vm: Arc<RwLock<VM>>,
) -> anyhow::Result<()>;

async fn execute(
&mut self,
network: Arc<RwLock<N>>,
Expand Down Expand Up @@ -157,6 +164,25 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> Node<N, DB, VM> {
self.network.clone()
}

pub async fn initialize(
&self,
services: &mut [Box<dyn LongLivedService<N, DB, VM>>],
) -> anyhow::Result<()> {
// Run lazy-initialization of all registered services
for service in services.iter_mut() {
info!("initialize service {}", service.name());
service
.initialize(
self.network.clone(),
self.database.clone(),
self.vm_handler.clone(),
)
.await?;
}

Ok(())
}

/// Sets up and runs a list of services.
pub async fn spawn_all(
&self,
Expand Down
9 changes: 9 additions & 0 deletions node/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ impl crate::Filter for TxFilter {
impl<N: Network, DB: database::DB, VM: vm::VMExecution>
LongLivedService<N, DB, VM> for MempoolSrv
{
async fn initialize(
&mut self,
_network: Arc<RwLock<N>>,
_db: Arc<RwLock<DB>>,
_vm: Arc<RwLock<VM>>,
) -> anyhow::Result<()> {
Ok(())
}

async fn execute(
&mut self,
network: Arc<RwLock<N>>,
Expand Down
9 changes: 8 additions & 1 deletion rusk/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};

#[cfg(feature = "node")]
let (rusk, node, service_list) = {
let (rusk, node, mut service_list) = {
let state_dir = rusk_profile::get_rusk_state_dir()?;
info!("Using state from {state_dir:?}");
let rusk = Rusk::new(state_dir)?;
Expand Down Expand Up @@ -154,6 +154,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Some(HttpServer::bind(handler, listen_addr, cert_and_key).await?);
}

#[cfg(feature = "node")]
// initialize all registered services
if let Err(err) = node.0.initialize(&mut service_list).await {
tracing::error!("node initialization failed: {}", err);
return Err(err.into());
}

#[cfg(feature = "node")]
// node spawn_all is the entry point
if let Err(e) = node.0.spawn_all(service_list).await {
Expand Down