Skip to content

Commit

Permalink
Merge pull request #1454 from dusk-network/init_services
Browse files Browse the repository at this point in the history
Support lazy-initialization for all services
  • Loading branch information
goshawk-3 authored Feb 27, 2024
2 parents 92af8dd + 359fa55 commit 33577c2
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 21 deletions.
50 changes: 32 additions & 18 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,38 +44,29 @@ const TOPICS: &[u8] = &[
const ACCEPT_BLOCK_TIMEOUT_SEC: Duration = Duration::from_secs(20);
const HEARTBEAT_SEC: Duration = Duration::from_secs(1);

pub struct ChainSrv {
pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
/// Inbound wire messages queue
inbound: AsyncQueue<Message>,
keys_path: String,
acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
}

#[async_trait]
impl<N: Network, DB: database::DB, VM: vm::VMExecution>
LongLivedService<N, DB, VM> for ChainSrv
LongLivedService<N, DB, VM> for ChainSrv<N, DB, VM>
{
async fn execute(
async fn initialize(
&mut self,
network: Arc<RwLock<N>>,
db: Arc<RwLock<DB>>,
vm: Arc<RwLock<VM>>,
) -> anyhow::Result<usize> {
// Register routes
LongLivedService::<N, DB, VM>::add_routes(
self,
TOPICS,
self.inbound.clone(),
&network,
)
.await?;

// Restore/Load most recent block
) -> anyhow::Result<()> {
let mrb = Self::load_most_recent_block(db.clone(), vm.clone()).await?;

let state_hash = mrb.inner().header().state_hash;
let provisioners_list = vm.read().await.get_provisioners(state_hash)?;

// Initialize Acceptor and trigger consensus task
// Initialize Acceptor
let acc = Acceptor::init_consensus(
&self.keys_path,
mrb,
Expand All @@ -85,7 +76,29 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
vm.clone(),
)
.await?;
let acc = Arc::new(RwLock::new(acc));

self.acceptor = Some(Arc::new(RwLock::new(acc)));

Ok(())
}

async fn execute(
&mut self,
network: Arc<RwLock<N>>,
_db: Arc<RwLock<DB>>,
_vm: Arc<RwLock<VM>>,
) -> anyhow::Result<usize> {
// Register routes
LongLivedService::<N, DB, VM>::add_routes(
self,
TOPICS,
self.inbound.clone(),
&network,
)
.await?;

let acc = self.acceptor.as_mut().expect("initialize is called");
acc.write().await.spawn_task().await;

// Start-up FSM instance
let mut fsm = SimpleFSM::new(acc.clone(), network.clone());
Expand Down Expand Up @@ -200,11 +213,12 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
}
}

impl ChainSrv {
impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
pub fn new(keys_path: String) -> Self {
Self {
inbound: Default::default(),
keys_path,
acceptor: None,
}
}

Expand All @@ -213,7 +227,7 @@ impl ChainSrv {
/// Panics
///
/// If register entry is read but block is not found.
async fn load_most_recent_block<DB: database::DB, VM: vm::VMExecution>(
async fn load_most_recent_block(
db: Arc<RwLock<DB>>,
vm: Arc<RwLock<VM>>,
) -> Result<BlockWithLabel> {
Expand Down
3 changes: 1 addition & 2 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,10 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
acc.try_revert(RevertTarget::LastFinalizedState).await?;
}

acc.spawn_task().await;
Ok(acc)
}

async fn spawn_task(&self) {
pub async fn spawn_task(&self) {
let provisioners_list = self.provisioners_list.read().await.clone();
let base_timeouts = self.adjust_round_base_timeouts().await;

Expand Down
9 changes: 9 additions & 0 deletions node/src/databroker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ impl DataBrokerSrv {
impl<N: Network, DB: database::DB, VM: vm::VMExecution>
LongLivedService<N, DB, VM> for DataBrokerSrv
{
async fn initialize(
&mut self,
_network: Arc<RwLock<N>>,
_db: Arc<RwLock<DB>>,
_vm: Arc<RwLock<VM>>,
) -> anyhow::Result<()> {
Ok(())
}

async fn execute(
&mut self,
network: Arc<RwLock<N>>,
Expand Down
26 changes: 26 additions & 0 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ pub trait Network: Send + Sync + 'static {
pub trait LongLivedService<N: Network, DB: database::DB, VM: vm::VMExecution>:
Send + Sync
{
async fn initialize(
&mut self,
network: Arc<RwLock<N>>,
database: Arc<RwLock<DB>>,
vm: Arc<RwLock<VM>>,
) -> anyhow::Result<()>;

async fn execute(
&mut self,
network: Arc<RwLock<N>>,
Expand Down Expand Up @@ -157,6 +164,25 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> Node<N, DB, VM> {
self.network.clone()
}

pub async fn initialize(
&self,
services: &mut [Box<dyn LongLivedService<N, DB, VM>>],
) -> anyhow::Result<()> {
// Run lazy-initialization of all registered services
for service in services.iter_mut() {
info!("initialize service {}", service.name());
service
.initialize(
self.network.clone(),
self.database.clone(),
self.vm_handler.clone(),
)
.await?;
}

Ok(())
}

/// Sets up and runs a list of services.
pub async fn spawn_all(
&self,
Expand Down
9 changes: 9 additions & 0 deletions node/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ impl crate::Filter for TxFilter {
impl<N: Network, DB: database::DB, VM: vm::VMExecution>
LongLivedService<N, DB, VM> for MempoolSrv
{
async fn initialize(
&mut self,
_network: Arc<RwLock<N>>,
_db: Arc<RwLock<DB>>,
_vm: Arc<RwLock<VM>>,
) -> anyhow::Result<()> {
Ok(())
}

async fn execute(
&mut self,
network: Arc<RwLock<N>>,
Expand Down
9 changes: 8 additions & 1 deletion rusk/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};

#[cfg(feature = "node")]
let (rusk, node, service_list) = {
let (rusk, node, mut service_list) = {
let state_dir = rusk_profile::get_rusk_state_dir()?;
info!("Using state from {state_dir:?}");
let rusk = Rusk::new(state_dir)?;
Expand Down Expand Up @@ -154,6 +154,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Some(HttpServer::bind(handler, listen_addr, cert_and_key).await?);
}

#[cfg(feature = "node")]
// initialize all registered services
if let Err(err) = node.0.initialize(&mut service_list).await {
tracing::error!("node initialization failed: {}", err);
return Err(err.into());
}

#[cfg(feature = "node")]
// node spawn_all is the entry point
if let Err(e) = node.0.spawn_all(service_list).await {
Expand Down

0 comments on commit 33577c2

Please sign in to comment.