From d97ccbcc1963cd36d12a1f6f88f3bfd41f6ae55b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Mon, 4 Sep 2023 17:54:29 +0200 Subject: [PATCH] fuel-indexer: prune finished tasks from tasks set --- packages/fuel-indexer/src/service.rs | 141 ++++++++++++++------------- 1 file changed, 71 insertions(+), 70 deletions(-) diff --git a/packages/fuel-indexer/src/service.rs b/packages/fuel-indexer/src/service.rs index 319befb71..3e33515b7 100644 --- a/packages/fuel-indexer/src/service.rs +++ b/packages/fuel-indexer/src/service.rs @@ -264,82 +264,84 @@ impl IndexerService { /// Kick it off! Run the indexer service loop, listening to service messages primarily coming from the web API. pub async fn run(mut self) -> IndexerResult<()> { loop { - match self.rx.try_recv() { - Ok(service_request) => match service_request { - ServiceRequest::Reload(request) => { - let mut conn = self.pool.acquire().await?; - - match queries::get_indexer_id( - &mut conn, - &request.namespace, - &request.identifier, - ) - .await - { - Ok(id) => { - let assets = - queries::latest_assets_for_indexer(&mut conn, &id) - .await?; - let mut manifest = - Manifest::try_from(&assets.manifest.bytes)?; - - let start_block = - get_start_block(&mut conn, &manifest).await?; - manifest.set_start_block(start_block); - - if let Some(killer_for_prev_executor) = - self.killers.remove(&manifest.uid()) - { - let uid = manifest.uid(); - info!("Indexer({uid}) is being replaced. Stopping previous version of Indexer({uid})."); - killer_for_prev_executor - .store(true, Ordering::SeqCst); - } + tokio::select! { + // Calling join_next will remove finished tasks from the set. + Some(Err(e)) = self.tasks.join_next() => { + error!("Error retiring indexer task {e}"); + } + Some(service_request) = self.rx.recv() => { + match service_request { + ServiceRequest::Reload(request) => { + let mut conn = self.pool.acquire().await?; + + match queries::get_indexer_id( + &mut conn, + &request.namespace, + &request.identifier, + ) + .await + { + Ok(id) => { + let assets = + queries::latest_assets_for_indexer(&mut conn, &id) + .await?; + let mut manifest = + Manifest::try_from(&assets.manifest.bytes)?; + + let start_block = + get_start_block(&mut conn, &manifest).await?; + manifest.set_start_block(start_block); + + if let Some(killer_for_prev_executor) = + self.killers.remove(&manifest.uid()) + { + let uid = manifest.uid(); + info!("Indexer({uid}) is being replaced. Stopping previous version of Indexer({uid})."); + killer_for_prev_executor + .store(true, Ordering::SeqCst); + } - match WasmIndexExecutor::create( - &self.config, - &manifest, - self.pool.clone(), - assets.schema.digest, - assets.wasm.bytes, - ) - .await - { - Ok(executor) => self.start_executor(executor), - Err(e) => { - error!( - "Failed to reload Indexer({}.{}): {e:?}", - &request.namespace, &request.identifier - ); - return Ok(()); + match WasmIndexExecutor::create( + &self.config, + &manifest, + self.pool.clone(), + assets.schema.digest, + assets.wasm.bytes, + ) + .await + { + Ok(executor) => self.start_executor(executor), + Err(e) => { + error!( + "Failed to reload Indexer({}.{}): {e:?}", + &request.namespace, &request.identifier + ); + return Ok(()); + } } } - } - Err(e) => { - error!( - "Failed to find Indexer({}.{}): {}", - &request.namespace, &request.identifier, e - ); + Err(e) => { + error!( + "Failed to find Indexer({}.{}): {}", + &request.namespace, &request.identifier, e + ); - continue; + continue; + } } } - } - ServiceRequest::Stop(request) => { - let uid = format!("{}.{}", request.namespace, request.identifier); - - if let Some(killer) = self.killers.remove(&uid) { - killer.store(true, Ordering::SeqCst); - } else { - warn!( - "Stop Indexer: No indexer with the name Indexer({uid})" - ); + ServiceRequest::Stop(request) => { + let uid = format!("{}.{}", request.namespace, request.identifier); + + if let Some(killer) = self.killers.remove(&uid) { + killer.store(true, Ordering::SeqCst); + } else { + warn!( + "Stop Indexer: No indexer with the name Indexer({uid})" + ); + } } } - }, - Err(e) => { - debug!("No service request to handle: {e:?}."); - sleep(Duration::from_secs(defaults::IDLE_SERVICE_WAIT_SECS)).await; } } } @@ -353,8 +355,7 @@ impl IndexerService { self.killers .insert(uid.clone(), executor.kill_switch().clone()); - self - .tasks + self.tasks .spawn(crate::executor::run_executor(&self.config, executor)); } }