diff --git a/node/src/chain.rs b/node/src/chain.rs index 3932eb846a..086db131a4 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -44,38 +44,29 @@ const TOPICS: &[u8] = &[ const ACCEPT_BLOCK_TIMEOUT_SEC: Duration = Duration::from_secs(20); const HEARTBEAT_SEC: Duration = Duration::from_secs(1); -pub struct ChainSrv { +pub struct ChainSrv { /// Inbound wire messages queue inbound: AsyncQueue, keys_path: String, + acceptor: Option>>>, } #[async_trait] impl - LongLivedService for ChainSrv + LongLivedService for ChainSrv { - async fn execute( + async fn initialize( &mut self, network: Arc>, db: Arc>, vm: Arc>, - ) -> anyhow::Result { - // Register routes - LongLivedService::::add_routes( - self, - TOPICS, - self.inbound.clone(), - &network, - ) - .await?; - - // Restore/Load most recent block + ) -> anyhow::Result<()> { let mrb = Self::load_most_recent_block(db.clone(), vm.clone()).await?; let state_hash = mrb.inner().header().state_hash; let provisioners_list = vm.read().await.get_provisioners(state_hash)?; - // Initialize Acceptor and trigger consensus task + // Initialize Acceptor let acc = Acceptor::init_consensus( &self.keys_path, mrb, @@ -85,7 +76,29 @@ impl vm.clone(), ) .await?; - let acc = Arc::new(RwLock::new(acc)); + + self.acceptor = Some(Arc::new(RwLock::new(acc))); + + Ok(()) + } + + async fn execute( + &mut self, + network: Arc>, + _db: Arc>, + _vm: Arc>, + ) -> anyhow::Result { + // Register routes + LongLivedService::::add_routes( + self, + TOPICS, + self.inbound.clone(), + &network, + ) + .await?; + + let acc = self.acceptor.as_mut().expect("initialize is called"); + acc.write().await.spawn_task().await; // Start-up FSM instance let mut fsm = SimpleFSM::new(acc.clone(), network.clone()); @@ -200,11 +213,12 @@ impl } } -impl ChainSrv { +impl ChainSrv { pub fn new(keys_path: String) -> Self { Self { inbound: Default::default(), keys_path, + acceptor: None, } } @@ -213,7 +227,7 @@ impl ChainSrv { /// Panics /// /// If register entry is read but block is not found. - async fn load_most_recent_block( + async fn load_most_recent_block( db: Arc>, vm: Arc>, ) -> Result { diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 59b6dbfad8..801497af29 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -161,11 +161,10 @@ impl Acceptor { acc.try_revert(RevertTarget::LastFinalizedState).await?; } - acc.spawn_task().await; Ok(acc) } - async fn spawn_task(&self) { + pub async fn spawn_task(&self) { let provisioners_list = self.provisioners_list.read().await.clone(); let base_timeouts = self.adjust_round_base_timeouts().await; diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 5524668f2f..6706523a5e 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -93,6 +93,15 @@ impl DataBrokerSrv { impl LongLivedService for DataBrokerSrv { + async fn initialize( + &mut self, + _network: Arc>, + _db: Arc>, + _vm: Arc>, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn execute( &mut self, network: Arc>, diff --git a/node/src/lib.rs b/node/src/lib.rs index 735f0b70a8..ea879515ac 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -87,6 +87,13 @@ pub trait Network: Send + Sync + 'static { pub trait LongLivedService: Send + Sync { + async fn initialize( + &mut self, + network: Arc>, + database: Arc>, + vm: Arc>, + ) -> anyhow::Result<()>; + async fn execute( &mut self, network: Arc>, @@ -157,6 +164,25 @@ impl Node { self.network.clone() } + pub async fn initialize( + &self, + services: &mut [Box>], + ) -> anyhow::Result<()> { + // Run lazy-initialization of all registered services + for service in services.iter_mut() { + info!("initialize service {}", service.name()); + service + .initialize( + self.network.clone(), + self.database.clone(), + self.vm_handler.clone(), + ) + .await?; + } + + Ok(()) + } + /// Sets up and runs a list of services. pub async fn spawn_all( &self, diff --git a/node/src/mempool.rs b/node/src/mempool.rs index bd127cea03..0f85e466dd 100644 --- a/node/src/mempool.rs +++ b/node/src/mempool.rs @@ -55,6 +55,15 @@ impl crate::Filter for TxFilter { impl LongLivedService for MempoolSrv { + async fn initialize( + &mut self, + _network: Arc>, + _db: Arc>, + _vm: Arc>, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn execute( &mut self, network: Arc>, diff --git a/rusk/src/bin/main.rs b/rusk/src/bin/main.rs index 442781d94c..53200724da 100644 --- a/rusk/src/bin/main.rs +++ b/rusk/src/bin/main.rs @@ -95,7 +95,7 @@ async fn main() -> Result<(), Box> { }; #[cfg(feature = "node")] - let (rusk, node, service_list) = { + let (rusk, node, mut service_list) = { let state_dir = rusk_profile::get_rusk_state_dir()?; info!("Using state from {state_dir:?}"); let rusk = Rusk::new(state_dir)?; @@ -154,6 +154,13 @@ async fn main() -> Result<(), Box> { Some(HttpServer::bind(handler, listen_addr, cert_and_key).await?); } + #[cfg(feature = "node")] + // initialize all registered services + if let Err(err) = node.0.initialize(&mut service_list).await { + tracing::error!("node initialization failed: {}", err); + return Err(err.into()); + } + #[cfg(feature = "node")] // node spawn_all is the entry point if let Err(e) = node.0.spawn_all(service_list).await {