From 5feed5806aa0d5b7b74ad851f96841170128f9d0 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Thu, 22 Feb 2024 16:44:52 +0200 Subject: [PATCH 1/5] node: Add support for lazy-initialization of all services --- node/src/lib.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/node/src/lib.rs b/node/src/lib.rs index 735f0b70a8..02ad2db1f3 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -87,6 +87,13 @@ pub trait Network: Send + Sync + 'static { pub trait LongLivedService: Send + Sync { + async fn initialize( + &mut self, + network: Arc>, + database: Arc>, + vm: Arc>, + ) -> anyhow::Result<()>; + async fn execute( &mut self, network: Arc>, @@ -157,6 +164,25 @@ impl Node { self.network.clone() } + pub async fn initialize( + &self, + service_list: &mut Vec>>, + ) -> anyhow::Result<()> { + // Run lazy-initialization of all registered services + for mut service in service_list.into_iter() { + info!("initialize service {}", service.name()); + service + .initialize( + self.network.clone(), + self.database.clone(), + self.vm_handler.clone(), + ) + .await?; + } + + Ok(()) + } + /// Sets up and runs a list of services. pub async fn spawn_all( &self, From ec2ac2c076118f94a4f15c0fc2c59eb56e1ca702 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Thu, 22 Feb 2024 16:50:52 +0200 Subject: [PATCH 2/5] node: Impl empty initialize for databroker and mempool --- node/src/databroker.rs | 9 +++++++++ node/src/lib.rs | 4 ++-- node/src/mempool.rs | 9 +++++++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 5524668f2f..6706523a5e 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -93,6 +93,15 @@ impl DataBrokerSrv { impl LongLivedService for DataBrokerSrv { + async fn initialize( + &mut self, + _network: Arc>, + _db: Arc>, + _vm: Arc>, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn execute( &mut self, network: Arc>, diff --git a/node/src/lib.rs b/node/src/lib.rs index 02ad2db1f3..ea879515ac 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -166,10 +166,10 @@ impl Node { pub async fn initialize( &self, - service_list: &mut Vec>>, + services: &mut [Box>], ) -> anyhow::Result<()> { // Run lazy-initialization of all registered services - for mut service in service_list.into_iter() { + for service in services.iter_mut() { info!("initialize service {}", service.name()); service .initialize( diff --git a/node/src/mempool.rs b/node/src/mempool.rs index bd127cea03..0f85e466dd 100644 --- a/node/src/mempool.rs +++ b/node/src/mempool.rs @@ -55,6 +55,15 @@ impl crate::Filter for TxFilter { impl LongLivedService for MempoolSrv { + async fn initialize( + &mut self, + _network: Arc>, + _db: Arc>, + _vm: Arc>, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn execute( &mut self, network: Arc>, From 8c518df029eb2043b50a48f2e6dce286f578bb8d Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Thu, 22 Feb 2024 16:53:30 +0200 Subject: [PATCH 3/5] node: Instantiate Acceptor in ChainSrv::Initialize --- node/src/chain.rs | 50 ++++++++++++++++++++++++-------------- node/src/chain/acceptor.rs | 3 +-- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index 3932eb846a..f38365b358 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -44,38 +44,29 @@ const TOPICS: &[u8] = &[ const ACCEPT_BLOCK_TIMEOUT_SEC: Duration = Duration::from_secs(20); const HEARTBEAT_SEC: Duration = Duration::from_secs(1); -pub struct ChainSrv { +pub struct ChainSrv { /// Inbound wire messages queue inbound: AsyncQueue, keys_path: String, + acceptor: Option>>>, } #[async_trait] impl - LongLivedService for ChainSrv + LongLivedService for ChainSrv { - async fn execute( + async fn initialize( &mut self, network: Arc>, db: Arc>, vm: Arc>, - ) -> anyhow::Result { - // Register routes - LongLivedService::::add_routes( - self, - TOPICS, - self.inbound.clone(), - &network, - ) - .await?; - - // Restore/Load most recent block + ) -> anyhow::Result<()> { let mrb = Self::load_most_recent_block(db.clone(), vm.clone()).await?; let state_hash = mrb.inner().header().state_hash; let provisioners_list = vm.read().await.get_provisioners(state_hash)?; - // Initialize Acceptor and trigger consensus task + // Initialize Acceptor let acc = Acceptor::init_consensus( &self.keys_path, mrb, @@ -85,7 +76,29 @@ impl vm.clone(), ) .await?; - let acc = Arc::new(RwLock::new(acc)); + + self.acceptor = Some(Arc::new(RwLock::new(acc))); + + Ok(()) + } + + async fn execute( + &mut self, + network: Arc>, + _db: Arc>, + _vm: Arc>, + ) -> anyhow::Result { + // Register routes + LongLivedService::::add_routes( + self, + TOPICS, + self.inbound.clone(), + &network, + ) + .await?; + + let acc = self.acceptor.as_mut().unwrap(); + acc.write().await.spawn_task().await; // Start-up FSM instance let mut fsm = SimpleFSM::new(acc.clone(), network.clone()); @@ -200,11 +213,12 @@ impl } } -impl ChainSrv { +impl ChainSrv { pub fn new(keys_path: String) -> Self { Self { inbound: Default::default(), keys_path, + acceptor: None, } } @@ -213,7 +227,7 @@ impl ChainSrv { /// Panics /// /// If register entry is read but block is not found. - async fn load_most_recent_block( + async fn load_most_recent_block( db: Arc>, vm: Arc>, ) -> Result { diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 59b6dbfad8..801497af29 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -161,11 +161,10 @@ impl Acceptor { acc.try_revert(RevertTarget::LastFinalizedState).await?; } - acc.spawn_task().await; Ok(acc) } - async fn spawn_task(&self) { + pub async fn spawn_task(&self) { let provisioners_list = self.provisioners_list.read().await.clone(); let base_timeouts = self.adjust_round_base_timeouts().await; From b3fd1fb123e566273be4dec8886026d43933f3d6 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Thu, 22 Feb 2024 16:55:12 +0200 Subject: [PATCH 4/5] rusk: Call Node::Initialize --- rusk/src/bin/main.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rusk/src/bin/main.rs b/rusk/src/bin/main.rs index 442781d94c..53200724da 100644 --- a/rusk/src/bin/main.rs +++ b/rusk/src/bin/main.rs @@ -95,7 +95,7 @@ async fn main() -> Result<(), Box> { }; #[cfg(feature = "node")] - let (rusk, node, service_list) = { + let (rusk, node, mut service_list) = { let state_dir = rusk_profile::get_rusk_state_dir()?; info!("Using state from {state_dir:?}"); let rusk = Rusk::new(state_dir)?; @@ -154,6 +154,13 @@ async fn main() -> Result<(), Box> { Some(HttpServer::bind(handler, listen_addr, cert_and_key).await?); } + #[cfg(feature = "node")] + // initialize all registered services + if let Err(err) = node.0.initialize(&mut service_list).await { + tracing::error!("node initialization failed: {}", err); + return Err(err.into()); + } + #[cfg(feature = "node")] // node spawn_all is the entry point if let Err(e) = node.0.spawn_all(service_list).await { From 359fa558f8479d96defd7ec0822def9faa9d16a4 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 27 Feb 2024 11:34:38 +0200 Subject: [PATCH 5/5] node: Add expect --- node/src/chain.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index f38365b358..086db131a4 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -97,7 +97,7 @@ impl ) .await?; - let acc = self.acceptor.as_mut().unwrap(); + let acc = self.acceptor.as_mut().expect("initialize is called"); acc.write().await.spawn_task().await; // Start-up FSM instance