From 79b48bc8bd0642d4aa36fa6ae47c38ad8d41f5a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Mon, 4 Sep 2023 12:35:00 +0200 Subject: [PATCH] fuel-indexer: refactor run, service, and executor --- Cargo.lock | 1 - packages/fuel-indexer-benchmarks/src/lib.rs | 7 +- packages/fuel-indexer-tests/tests/service.rs | 5 +- packages/fuel-indexer/Cargo.toml | 1 - packages/fuel-indexer/src/commands/run.rs | 107 ++++---- packages/fuel-indexer/src/executor.rs | 175 +++++-------- packages/fuel-indexer/src/service.rs | 252 +++++++++---------- 7 files changed, 235 insertions(+), 313 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9779e287a..d8a0e6cfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3227,7 +3227,6 @@ dependencies = [ "sqlx", "thiserror", "tokio", - "tokio-util", "tracing", "wasmer", "wasmer-middlewares", diff --git a/packages/fuel-indexer-benchmarks/src/lib.rs b/packages/fuel-indexer-benchmarks/src/lib.rs index 22eafb10f..474b53643 100644 --- a/packages/fuel-indexer-benchmarks/src/lib.rs +++ b/packages/fuel-indexer-benchmarks/src/lib.rs @@ -7,7 +7,7 @@ use fuel_indexer::{ use fuel_indexer_database::IndexerConnectionPool; use fuel_indexer_lib::config::DatabaseConfig; use fuel_indexer_tests::fixtures::TestPostgresDb; -use std::{str::FromStr, sync::atomic::AtomicBool}; +use std::str::FromStr; /// Location of Fuel node to be used for block retrieval. pub const NODE_URL: &str = "beta-4.fuel.network:80"; @@ -73,7 +73,6 @@ pub fn create_wasm_indexer_benchmark( .unwrap(); let blocks = rt.block_on(get_blocks(start_block, num_blocks)).unwrap(); c.bench_function(name, move |b| { - let kill_switch = std::sync::Arc::new(AtomicBool::new(false)); b.iter_batched( // This setup function is run prior to each iteration of // the benchmark; this ensures that there is a fresh WASM @@ -92,9 +91,7 @@ pub fn create_wasm_indexer_benchmark( (executor, blocks.clone()) }) }, - |(mut ex, blocks)| { - rt.block_on(ex.handle_events(kill_switch.clone(), blocks)) - }, + |(mut ex, blocks)| rt.block_on(ex.handle_events(blocks)), criterion::BatchSize::SmallInput, ) }); diff --git a/packages/fuel-indexer-tests/tests/service.rs b/packages/fuel-indexer-tests/tests/service.rs index 20a306aff..28c29add1 100644 --- a/packages/fuel-indexer-tests/tests/service.rs +++ b/packages/fuel-indexer-tests/tests/service.rs @@ -3,7 +3,6 @@ use fuel_indexer::{Executor, IndexerConfig, WasmIndexExecutor}; use fuel_indexer_lib::{config::DatabaseConfig, manifest::Manifest}; use fuel_indexer_tests::fixtures::TestPostgresDb; use std::str::FromStr; -use std::sync::atomic::AtomicBool; #[tokio::test] async fn test_wasm_executor_can_meter_execution() { @@ -58,11 +57,9 @@ async fn test_wasm_executor_can_meter_execution() { .await .unwrap(); - let kill_switch = std::sync::Arc::new(AtomicBool::new(false)); - let blocks: Vec = vec![]; - if let Err(e) = executor.handle_events(kill_switch, blocks.clone()).await { + if let Err(e) = executor.handle_events(blocks.clone()).await { if let fuel_indexer::IndexerError::RuntimeError(e) = e { if let Some(e) = e.to_trap() { assert_eq!(e, wasmer_types::TrapCode::UnreachableCodeReached); diff --git a/packages/fuel-indexer/Cargo.toml b/packages/fuel-indexer/Cargo.toml index edf9fc26f..334653a3a 100644 --- a/packages/fuel-indexer/Cargo.toml +++ b/packages/fuel-indexer/Cargo.toml @@ -36,7 +36,6 @@ itertools = "0.10" sqlx = { version = "0.6", features = ["bigdecimal"] } thiserror = { workspace = true } tokio = { features = ["macros", "rt-multi-thread", "sync", "process"], workspace = true } -tokio-util = { workspace = true } tracing = { workspace = true } wasmer = "4" wasmer-middlewares = "4" diff --git a/packages/fuel-indexer/src/commands/run.rs b/packages/fuel-indexer/src/commands/run.rs index 1cdc347f2..3bd2b0e1d 100644 --- a/packages/fuel-indexer/src/commands/run.rs +++ b/packages/fuel-indexer/src/commands/run.rs @@ -8,55 +8,47 @@ use fuel_indexer_lib::{ }; use tokio::signal::unix::{signal, Signal, SignalKind}; use tokio::sync::mpsc::channel; -use tokio_util::sync::CancellationToken; -use tracing::info; +use tracing::{error, info}; #[cfg(feature = "api-server")] use fuel_indexer_api_server::api::WebApi; -// Returns a CancellationToken which will be notified when a shutdown signal -// have been reveived. -async fn shutdown_signal_handler() -> anyhow::Result { - let cancel_token = tokio_util::sync::CancellationToken::new(); - +// Returns a future which completes when a shutdown signal has been reveived. +fn shutdown_signal_handler() -> std::io::Result> { let mut sighup: Signal = signal(SignalKind::hangup())?; let mut sigterm: Signal = signal(SignalKind::terminate())?; let mut sigint: Signal = signal(SignalKind::interrupt())?; - tokio::spawn({ - let cancel_token = cancel_token.clone(); - - async move { - #[cfg(unix)] - { - tokio::select! { - _ = sighup.recv() => { - info!("Received SIGHUP. Stopping services."); - } - _ = sigterm.recv() => { - info!("Received SIGTERM. Stopping services."); - } - _ = sigint.recv() => { - info!("Received SIGINT. Stopping services."); - } + let future = async move { + #[cfg(unix)] + { + tokio::select! { + _ = sighup.recv() => { + info!("Received SIGHUP. Stopping services."); + } + _ = sigterm.recv() => { + info!("Received SIGTERM. Stopping services."); + } + _ = sigint.recv() => { + info!("Received SIGINT. Stopping services."); } - cancel_token.cancel(); } + } - #[cfg(not(unix))] - { - signal::ctrl_c().await?; - info!("Received CTRL+C. Stopping services."); - } + #[cfg(not(unix))] + { + signal::ctrl_c().await?; + info!("Received CTRL+C. Stopping services."); } - }); + }; - Ok(cancel_token) + Ok(future) } pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { - // for graceful shutdown - let cancel_token = shutdown_signal_handler().await?; + let mut subsystems: tokio::task::JoinSet<()> = tokio::task::JoinSet::new(); + + subsystems.spawn(shutdown_signal_handler()?); let IndexerArgs { manifest, @@ -138,16 +130,25 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { } } - let service_handle = tokio::spawn(service.run()); + subsystems.spawn(async { + let result = service.run().await; + if let Err(e) = result { + error!("Indexer Service failed: {e}"); + } + }); #[cfg(feature = "api-server")] - let web_handle = tokio::spawn(WebApi::build_and_run(config.clone(), pool, tx)); - - #[cfg(not(feature = "api-server"))] - let web_handle = tokio::spawn(futures::future::ready(())); + subsystems.spawn({ + let config = config.clone(); + async { + if let Err(e) = WebApi::build_and_run(config, pool, tx).await { + error!("Api Server failed: {e}"); + } + } + }); #[cfg(feature = "fuel-core-lib")] - let node_handle = { + { use fuel_core::service::{Config, FuelService}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -156,28 +157,18 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 4000), ..Config::local_node() }; - tokio::spawn(async move { - let node = FuelService::new_node(config).await.unwrap(); - Some(node) - }) - } else { - tokio::spawn(futures::future::ready(None)) + subsystems.spawn(async move { + if let Err(e) = FuelService::new_node(config).await { + error!("Fuen Node failed: {e}"); + }; + }); } }; - #[cfg(not(feature = "fuel-core-lib"))] - let node_handle = tokio::spawn(futures::future::ready(())); - - // spawn application as separate task - tokio::spawn({ - let cancel_token = cancel_token.clone(); - async move { - let _ = tokio::join!(service_handle, node_handle, web_handle); - cancel_token.cancel(); - } - }); - - cancel_token.cancelled().await; + // Each subsystem runs its own loop, and we require all subsystems for the + // Indexer service to operate correctly. If any of the subsystems stops + // running, the entire Indexer Service exits. + subsystems.join_next().await; if embedded_database { let name = postgres_database.unwrap_or(defaults::POSTGRES_DATABASE.to_string()); diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 67267494d..726e34954 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -3,11 +3,7 @@ use crate::{ database::Database, ffi, queries::ClientExt, IndexerConfig, IndexerError, IndexerResult, }; -use async_std::{ - fs::File, - io::ReadExt, - sync::{Arc, Mutex}, -}; +use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use fuel_core_client::client::{ pagination::{PageDirection, PaginatedResult, PaginationRequest}, @@ -33,7 +29,7 @@ use std::{ sync::atomic::{AtomicBool, Ordering}, }; use tokio::{ - task::{spawn_blocking, JoinHandle}, + task::spawn_blocking, time::{sleep, Duration}, }; use tracing::{debug, error, info, warn}; @@ -78,24 +74,23 @@ impl From for Vec { // types in `fuel_core_client` don't compile to WASM. pub fn run_executor( config: &IndexerConfig, - manifest: &Manifest, mut executor: T, - kill_switch: Arc, ) -> impl Future { // TODO: https://github.com/FuelLabs/fuel-indexer/issues/286 - let end_block = manifest.end_block(); + let end_block = executor.manifest().end_block(); let stop_idle_indexers = config.stop_idle_indexers; - let indexer_uid = manifest.uid(); + let indexer_uid = executor.manifest().uid(); let node_block_page_size = config.node_block_page_size; - let fuel_node_addr = manifest + let fuel_node_addr = executor + .manifest() .fuel_client() .map(|x| x.to_string()) .unwrap_or(config.fuel_node.to_string()); // Where should we initially start when fetching blocks from the client? - let mut cursor = manifest.start_block().map(|x| { + let mut cursor = executor.manifest().start_block().map(|x| { if x > 1 { let decremented = x - 1; decremented.to_string() @@ -138,7 +133,7 @@ pub fn run_executor( loop { // If something else has signaled that this indexer should stop, then stop. - if kill_switch.load(Ordering::SeqCst) { + if executor.kill_switch().load(Ordering::SeqCst) { info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); break; } @@ -183,9 +178,7 @@ pub fn run_executor( } // The client responded with actual blocks, so attempt to index them. - let result = executor - .handle_events(kill_switch.clone(), block_info) - .await; + let result = executor.handle_events(block_info).await; if let Err(e) = result { // Run time metering is deterministic. There is no point in retrying. @@ -229,7 +222,7 @@ pub fn run_executor( cursor = next_cursor; // Again, check if something else has signaled that this indexer should stop, then stop. - if kill_switch.load(Ordering::SeqCst) { + if executor.kill_switch().load(Ordering::SeqCst) { info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); break; } @@ -480,11 +473,11 @@ pub trait Executor where Self: Sized, { - async fn handle_events( - &mut self, - kill_switch: Arc, - blocks: Vec, - ) -> IndexerResult<()>; + async fn handle_events(&mut self, blocks: Vec) -> IndexerResult<()>; + + fn manifest(&self) -> &Manifest; + + fn kill_switch(&self) -> &Arc; } /// WASM indexer runtime environment responsible for fetching/saving data to and from the database. @@ -535,6 +528,9 @@ where /// Function that handles events. handle_events_fn: fn(Vec, Arc>) -> F, + + /// Kill switch. When set to true, the indexer must stop execution. + kill_switch: Arc, } impl NativeIndexExecutor @@ -557,31 +553,23 @@ where ) .await?; db.load_schema(version).await?; + let kill_switch = Arc::new(AtomicBool::new(false)); Ok(Self { db: Arc::new(Mutex::new(db)), manifest: manifest.to_owned(), handle_events_fn, + kill_switch, }) } /// Create a new `NativeIndexExecutor`. - pub async fn create> + Send + 'static>( + pub async fn create( config: &IndexerConfig, manifest: &Manifest, pool: IndexerConnectionPool, - handle_events: fn(Vec, Arc>) -> T, - ) -> IndexerResult<(JoinHandle<()>, ExecutorSource, Arc)> { - let executor = - NativeIndexExecutor::new(manifest, pool.clone(), config, handle_events) - .await?; - let kill_switch = Arc::new(AtomicBool::new(false)); - let handle = tokio::spawn(run_executor( - config, - manifest, - executor, - kill_switch.clone(), - )); - Ok((handle, ExecutorSource::Manifest, kill_switch)) + handle_events: fn(Vec, Arc>) -> F, + ) -> IndexerResult { + NativeIndexExecutor::new(manifest, pool.clone(), config, handle_events).await } } @@ -591,11 +579,7 @@ where F: Future> + Send, { /// Handle events for native executor. - async fn handle_events( - &mut self, - kill_switch: Arc, - blocks: Vec, - ) -> IndexerResult<()> { + async fn handle_events(&mut self, blocks: Vec) -> IndexerResult<()> { self.db.lock().await.start_transaction().await?; let res = (self.handle_events_fn)(blocks, self.db.clone()).await; let uid = self.manifest.uid(); @@ -605,7 +589,7 @@ where return Err(IndexerError::NativeExecutionRuntimeError); } else { // Do not commit if kill switch has been triggered. - if kill_switch.load(Ordering::SeqCst) { + if self.kill_switch.load(Ordering::SeqCst) { self.db.lock().await.revert_transaction().await?; } else { self.db.lock().await.commit_transaction().await?; @@ -613,6 +597,14 @@ where } Ok(()) } + + fn kill_switch(&self) -> &Arc { + &self.kill_switch + } + + fn manifest(&self) -> &Manifest { + &self.manifest + } } /// WASM executors are the primary means of execution. @@ -639,6 +631,9 @@ pub struct WasmIndexExecutor { /// Manifest of the indexer. manifest: Manifest, + + /// Kill switch. When set to true, the indexer must stop execution. + kill_switch: Arc, } impl WasmIndexExecutor { @@ -715,6 +710,8 @@ impl WasmIndexExecutor { db.lock().await.load_schema(schema_version).await?; + let kill_switch = Arc::new(AtomicBool::new(false)); + Ok(WasmIndexExecutor { instance, _module: module, @@ -722,6 +719,7 @@ impl WasmIndexExecutor { db: db.clone(), metering_points: config.metering_points, manifest: manifest.clone(), + kill_switch, }) } @@ -742,76 +740,21 @@ impl WasmIndexExecutor { pub async fn create( config: &IndexerConfig, manifest: &Manifest, - exec_source: ExecutorSource, pool: IndexerConnectionPool, schema_version: String, - ) -> IndexerResult<(JoinHandle<()>, ExecutorSource, Arc)> { - let killer = Arc::new(AtomicBool::new(false)); + wasm_bytes: impl AsRef<[u8]>, + ) -> IndexerResult { let uid = manifest.uid(); - match &exec_source { - ExecutorSource::Manifest => match manifest.module() { - crate::Module::Wasm(ref module) => { - let mut bytes = Vec::::new(); - let mut file = File::open(module).await?; - file.read_to_end(&mut bytes).await?; - - match WasmIndexExecutor::new( - config, - manifest, - bytes.clone(), - pool, - schema_version, - ) - .await - { - Ok(executor) => { - let handle = tokio::spawn(run_executor( - config, - manifest, - executor, - killer.clone(), - )); - - Ok((handle, ExecutorSource::Registry(bytes), killer)) - } - Err(e) => { - error!( - "Could not instantiate WasmIndexExecutor({uid}) from ExecutorSource::Manifest: {e:?}." - ); - Err(IndexerError::WasmExecutionInstantiationError) - } - } - } - crate::Module::Native => { - Err(IndexerError::NativeExecutionInstantiationError) - } - }, - ExecutorSource::Registry(bytes) => { - match WasmIndexExecutor::new( - config, - manifest, - bytes, - pool, - schema_version, - ) - .await - { - Ok(executor) => { - let handle = tokio::spawn(run_executor( - config, - manifest, - executor, - killer.clone(), - )); - - Ok((handle, exec_source, killer)) - } - Err(e) => { - error!("Could not instantiate WasmIndexExecutor({uid}) from ExecutorSource::Registry: {e:?}."); - Err(IndexerError::WasmExecutionInstantiationError) - } - } + match WasmIndexExecutor::new(config, manifest, wasm_bytes, pool, schema_version) + .await + { + Ok(executor) => Ok(executor), + Err(e) => { + error!( + "Could not instantiate WasmIndexExecutor({uid}) from ExecutorSource::Manifest: {e:?}." + ); + Err(IndexerError::WasmExecutionInstantiationError) } } } @@ -868,11 +811,7 @@ impl WasmIndexExecutor { #[async_trait] impl Executor for WasmIndexExecutor { /// Trigger a WASM event handler, passing in a serialized event struct. - async fn handle_events( - &mut self, - kill_switch: Arc, - blocks: Vec, - ) -> IndexerResult<()> { + async fn handle_events(&mut self, blocks: Vec) -> IndexerResult<()> { if blocks.is_empty() { return Ok(()); } @@ -923,7 +862,7 @@ impl Executor for WasmIndexExecutor { } } else { // Do not commit if kill switch has been triggered. - if kill_switch.load(Ordering::SeqCst) { + if self.kill_switch.load(Ordering::SeqCst) { self.db.lock().await.revert_transaction().await?; } else { self.db.lock().await.commit_transaction().await?; @@ -932,4 +871,12 @@ impl Executor for WasmIndexExecutor { Ok(()) } + + fn kill_switch(&self) -> &Arc { + &self.kill_switch + } + + fn manifest(&self) -> &Manifest { + &self.manifest + } } diff --git a/packages/fuel-indexer/src/service.rs b/packages/fuel-indexer/src/service.rs index 85cc5f29d..2a4a0fbcb 100644 --- a/packages/fuel-indexer/src/service.rs +++ b/packages/fuel-indexer/src/service.rs @@ -1,24 +1,21 @@ use crate::{ - executor::{ExecutorSource, NativeIndexExecutor, WasmIndexExecutor}, - Database, IndexerConfig, IndexerError, IndexerResult, Manifest, + executor::{NativeIndexExecutor, WasmIndexExecutor}, + Database, Executor, IndexerConfig, IndexerError, IndexerResult, Manifest, }; use async_std::sync::{Arc, Mutex}; +use async_std::{fs::File, io::ReadExt}; use fuel_indexer_database::{ queries, types::IndexerAssetType, IndexerConnection, IndexerConnectionPool, }; use fuel_indexer_lib::{defaults, utils::ServiceRequest}; use fuel_indexer_schema::db::manager::SchemaManager; use fuel_indexer_types::fuel::BlockData; -use futures::{ - stream::{FuturesUnordered, StreamExt}, - Future, -}; +use futures::Future; use std::collections::HashMap; use std::marker::Send; use std::sync::atomic::{AtomicBool, Ordering}; use tokio::{ sync::mpsc::Receiver, - task::JoinHandle, time::{sleep, Duration}, }; use tracing::{debug, error, info, warn}; @@ -34,8 +31,11 @@ pub struct IndexerService { /// Schema manager used to manage the database schema. manager: SchemaManager, - /// Handles to the spawned indexers. - handles: HashMap>, + /// Tasks for the spawned indexers. + tasks: tokio::task::JoinSet<()>, + + /// Abort handles for the spawned indexers. + abort_handles: HashMap, /// Channel used to receive `ServiceRequest`s. rx: Receiver, @@ -57,8 +57,9 @@ impl IndexerService { config, pool, manager, - handles: HashMap::default(), killers: HashMap::default(), + tasks: tokio::task::JoinSet::new(), + abort_handles: HashMap::new(), rx, }) } @@ -125,20 +126,33 @@ impl IndexerService { ) .await?; + info!("FOO"); let start_block = get_start_block(&mut conn, &manifest).await?; manifest.set_start_block(start_block); - let (handle, exec_source, killer) = WasmIndexExecutor::create( + let wasm_bytes = match manifest.module() { + crate::Module::Wasm(ref module) => { + let mut bytes = Vec::::new(); + let mut file = File::open(module).await?; + file.read_to_end(&mut bytes).await?; + bytes + } + crate::Module::Native => { + return Err(IndexerError::NativeExecutionInstantiationError) + } + }; + + let executor = WasmIndexExecutor::create( &self.config, &manifest, - ExecutorSource::Manifest, self.pool.clone(), schema_version, + wasm_bytes.clone(), ) .await?; let mut items = vec![ - (IndexerAssetType::Wasm, exec_source.into()), + (IndexerAssetType::Wasm, wasm_bytes), (IndexerAssetType::Manifest, manifest.clone().into()), (IndexerAssetType::Schema, schema_bytes), ]; @@ -166,8 +180,8 @@ impl IndexerService { manifest.namespace(), manifest.identifier() ); - self.handles.insert(manifest.uid(), handle); - self.killers.insert(manifest.uid(), killer); + + self.start_executor(executor); Ok(()) } @@ -183,18 +197,18 @@ impl IndexerService { let start_block = get_start_block(&mut conn, &manifest).await.unwrap_or(1); manifest.set_start_block(start_block); - if let Ok((handle, _module_bytes, killer)) = WasmIndexExecutor::create( + if let Ok(executor) = WasmIndexExecutor::create( &self.config, &manifest, - ExecutorSource::Registry(assets.wasm.bytes), self.pool.clone(), assets.schema.digest, + assets.wasm.bytes, ) .await { info!("Registered Indexer({})", manifest.uid()); - self.handles.insert(manifest.uid(), handle); - self.killers.insert(manifest.uid(), killer); + + self.start_executor(executor); } else { error!( "Failed to register Indexer({}) from registry.", @@ -237,7 +251,7 @@ impl IndexerService { manifest.set_start_block(start_block); let uid = manifest.uid(); - let (handle, _module_bytes, killer) = NativeIndexExecutor::::create( + let executor = NativeIndexExecutor::::create( &self.config, &manifest, self.pool.clone(), @@ -247,131 +261,109 @@ impl IndexerService { info!("Registered NativeIndex({})", uid); - self.handles.insert(uid.clone(), handle); - self.killers.insert(uid, killer); + self.start_executor(executor); + Ok(()) } - /// Kick it off! - pub async fn run(self) { - let IndexerService { - handles, - rx, - pool, - config, - killers, - .. - } = self; - - let futs = Arc::new(Mutex::new(FuturesUnordered::from_iter( - handles.into_values(), - ))); - - let _ = tokio::spawn(create_service_task( - rx, - config.clone(), - pool.clone(), - futs.clone(), - killers, - )) - .await - .unwrap(); - - while let Some(fut) = futs.lock().await.next().await { - info!("Retired a future {fut:?}"); - } - } -} + /// Kick it off! Run the indexer service loop, listening to service messages primarily coming from the web API. + pub async fn run(mut self) -> IndexerResult<()> { + loop { + match self.rx.try_recv() { + Ok(service_request) => match service_request { + ServiceRequest::Reload(request) => { + let mut conn = self.pool.acquire().await?; + + match queries::get_indexer_id( + &mut conn, + &request.namespace, + &request.identifier, + ) + .await + { + Ok(id) => { + let assets = + queries::latest_assets_for_indexer(&mut conn, &id) + .await?; + let mut manifest = + Manifest::try_from(&assets.manifest.bytes)?; + + let start_block = + get_start_block(&mut conn, &manifest).await?; + manifest.set_start_block(start_block); + + if let Some(killer_for_prev_executor) = + self.killers.remove(&manifest.uid()) + { + let uid = manifest.uid(); + info!("Indexer({uid}) is being replaced. Stopping previous version of Indexer({uid})."); + killer_for_prev_executor + .store(true, Ordering::SeqCst); + } -/// Create a tokio task used to listen to service messages primarily coming from the web API. -async fn create_service_task( - mut rx: Receiver, - config: IndexerConfig, - pool: IndexerConnectionPool, - futs: Arc>>>, - mut killers: HashMap>, -) -> IndexerResult<()> { - loop { - let futs = futs.lock().await; - match rx.try_recv() { - Ok(service_request) => match service_request { - ServiceRequest::Reload(request) => { - let mut conn = pool.acquire().await?; - - match queries::get_indexer_id( - &mut conn, - &request.namespace, - &request.identifier, - ) - .await - { - Ok(id) => { - let assets = - queries::latest_assets_for_indexer(&mut conn, &id) - .await?; - let mut manifest = - Manifest::try_from(&assets.manifest.bytes)?; - - let start_block = - get_start_block(&mut conn, &manifest).await?; - manifest.set_start_block(start_block); - - match WasmIndexExecutor::create( - &config, - &manifest, - ExecutorSource::Registry(assets.wasm.bytes), - pool.clone(), - assets.schema.digest, - ) - .await - { - Ok((handle, _module_bytes, killer)) => { - futs.push(handle); - - if let Some(killer_for_prev_executor) = - killers.insert(manifest.uid(), killer) - { - let uid = manifest.uid(); - info!("Indexer({uid}) was replaced. Stopping previous version of Indexer({uid})."); - killer_for_prev_executor - .store(true, Ordering::SeqCst); + match WasmIndexExecutor::create( + &self.config, + &manifest, + self.pool.clone(), + assets.schema.digest, + assets.wasm.bytes, + ) + .await + { + Ok(executor) => self.start_executor(executor), + Err(e) => { + error!( + "Failed to reload Indexer({}.{}): {e:?}", + &request.namespace, &request.identifier + ); + return Ok(()); } } - Err(e) => { - error!( - "Failed to reload Indexer({}.{}): {e:?}", - &request.namespace, &request.identifier - ); - return Ok(()); - } } - } - Err(e) => { - error!( - "Failed to find Indexer({}.{}): {}", - &request.namespace, &request.identifier, e - ); + Err(e) => { + error!( + "Failed to find Indexer({}.{}): {}", + &request.namespace, &request.identifier, e + ); - continue; + continue; + } } } - } - ServiceRequest::Stop(request) => { - let uid = format!("{}.{}", request.namespace, request.identifier); - - if let Some(killer) = killers.remove(&uid) { - killer.store(true, Ordering::SeqCst); - } else { - warn!("Stop Indexer: No indexer with the name Indexer({uid})"); + ServiceRequest::Stop(request) => { + let uid = format!("{}.{}", request.namespace, request.identifier); + + if let Some(killer) = self.killers.remove(&uid) { + killer.store(true, Ordering::SeqCst); + } else { + warn!( + "Stop Indexer: No indexer with the name Indexer({uid})" + ); + } } + }, + Err(e) => { + debug!("No service request to handle: {e:?}."); + sleep(Duration::from_secs(defaults::IDLE_SERVICE_WAIT_SECS)).await; } - }, - Err(e) => { - debug!("No service request to handle: {e:?}."); - sleep(Duration::from_secs(defaults::IDLE_SERVICE_WAIT_SECS)).await; } } } + + // Spawn and register a tokio::task running the Executor loop, as well as + // the kill switch and the abort handle.ß + fn start_executor(&mut self, executor: T) { + let uid = executor.manifest().uid(); + + self.killers + .insert(uid.clone(), executor.kill_switch().clone()); + + let abort_handle = self + .tasks + .spawn(crate::executor::run_executor(&self.config, executor)); + + self.abort_handles.insert(uid, abort_handle); + } } /// Determine the starting block for this indexer.