From b0d8e1dd3975ad4476063befae2c61a0cb6cd434 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Mon, 4 Sep 2023 16:15:51 +0200 Subject: [PATCH] refactor fuel-indexer run command to take advantage of tokio::task::JoinSet --- Cargo.lock | 1 - packages/fuel-indexer/Cargo.toml | 1 - packages/fuel-indexer/src/commands/run.rs | 102 ++++++++++------------ 3 files changed, 44 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9779e287a..d8a0e6cfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3227,7 +3227,6 @@ dependencies = [ "sqlx", "thiserror", "tokio", - "tokio-util", "tracing", "wasmer", "wasmer-middlewares", diff --git a/packages/fuel-indexer/Cargo.toml b/packages/fuel-indexer/Cargo.toml index edf9fc26f..334653a3a 100644 --- a/packages/fuel-indexer/Cargo.toml +++ b/packages/fuel-indexer/Cargo.toml @@ -36,7 +36,6 @@ itertools = "0.10" sqlx = { version = "0.6", features = ["bigdecimal"] } thiserror = { workspace = true } tokio = { features = ["macros", "rt-multi-thread", "sync", "process"], workspace = true } -tokio-util = { workspace = true } tracing = { workspace = true } wasmer = "4" wasmer-middlewares = "4" diff --git a/packages/fuel-indexer/src/commands/run.rs b/packages/fuel-indexer/src/commands/run.rs index 1cdc347f2..8952ca28f 100644 --- a/packages/fuel-indexer/src/commands/run.rs +++ b/packages/fuel-indexer/src/commands/run.rs @@ -8,55 +8,47 @@ use fuel_indexer_lib::{ }; use tokio::signal::unix::{signal, Signal, SignalKind}; use tokio::sync::mpsc::channel; -use tokio_util::sync::CancellationToken; -use tracing::info; +use tracing::{error, info}; #[cfg(feature = "api-server")] use fuel_indexer_api_server::api::WebApi; -// Returns a CancellationToken which will be notified when a shutdown signal -// have been reveived. -async fn shutdown_signal_handler() -> anyhow::Result { - let cancel_token = tokio_util::sync::CancellationToken::new(); - +// Returns a future which completes when a shutdown signal has been reveived. +fn shutdown_signal_handler() -> std::io::Result> { let mut sighup: Signal = signal(SignalKind::hangup())?; let mut sigterm: Signal = signal(SignalKind::terminate())?; let mut sigint: Signal = signal(SignalKind::interrupt())?; - tokio::spawn({ - let cancel_token = cancel_token.clone(); - - async move { - #[cfg(unix)] - { - tokio::select! { - _ = sighup.recv() => { - info!("Received SIGHUP. Stopping services."); - } - _ = sigterm.recv() => { - info!("Received SIGTERM. Stopping services."); - } - _ = sigint.recv() => { - info!("Received SIGINT. Stopping services."); - } + let future = async move { + #[cfg(unix)] + { + tokio::select! { + _ = sighup.recv() => { + info!("Received SIGHUP. Stopping services."); + } + _ = sigterm.recv() => { + info!("Received SIGTERM. Stopping services."); + } + _ = sigint.recv() => { + info!("Received SIGINT. Stopping services."); } - cancel_token.cancel(); } + } - #[cfg(not(unix))] - { - signal::ctrl_c().await?; - info!("Received CTRL+C. Stopping services."); - } + #[cfg(not(unix))] + { + signal::ctrl_c().await?; + info!("Received CTRL+C. Stopping services."); } - }); + }; - Ok(cancel_token) + Ok(future) } pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { - // for graceful shutdown - let cancel_token = shutdown_signal_handler().await?; + let mut subsystems: tokio::task::JoinSet<()> = tokio::task::JoinSet::new(); + + subsystems.spawn(shutdown_signal_handler()?); let IndexerArgs { manifest, @@ -138,16 +130,20 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { } } - let service_handle = tokio::spawn(service.run()); + subsystems.spawn(service.run()); #[cfg(feature = "api-server")] - let web_handle = tokio::spawn(WebApi::build_and_run(config.clone(), pool, tx)); - - #[cfg(not(feature = "api-server"))] - let web_handle = tokio::spawn(futures::future::ready(())); + subsystems.spawn({ + let config = config.clone(); + async { + if let Err(e) = WebApi::build_and_run(config, pool, tx).await { + error!("Api Server failed: {e}"); + } + } + }); #[cfg(feature = "fuel-core-lib")] - let node_handle = { + { use fuel_core::service::{Config, FuelService}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -156,28 +152,18 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 4000), ..Config::local_node() }; - tokio::spawn(async move { - let node = FuelService::new_node(config).await.unwrap(); - Some(node) - }) - } else { - tokio::spawn(futures::future::ready(None)) + subsystems.spawn(async move { + if let Err(e) = FuelService::new_node(config).await { + error!("Fuen Node failed: {e}"); + }; + }); } }; - #[cfg(not(feature = "fuel-core-lib"))] - let node_handle = tokio::spawn(futures::future::ready(())); - - // spawn application as separate task - tokio::spawn({ - let cancel_token = cancel_token.clone(); - async move { - let _ = tokio::join!(service_handle, node_handle, web_handle); - cancel_token.cancel(); - } - }); - - cancel_token.cancelled().await; + // Each subsystem runs its own loop, and we require all subsystems for the + // Indexer service to operate correctly. If any of the subsystems stops + // running, the entire Indexer Service exits. + subsystems.join_next().await; if embedded_database { let name = postgres_database.unwrap_or(defaults::POSTGRES_DATABASE.to_string());