From e74645e0d418db7834e8a271ad50edcaaf055624 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Wed, 6 Sep 2023 16:57:05 +0200 Subject: [PATCH] early exit and kill switch --- packages/fuel-indexer-benchmarks/src/lib.rs | 4 +- packages/fuel-indexer-lib/src/lib.rs | 10 +++ packages/fuel-indexer-macros/src/decoder.rs | 4 +- packages/fuel-indexer-plugin/src/wasm.rs | 25 ++---- packages/fuel-indexer-tests/tests/service.rs | 4 +- packages/fuel-indexer/src/database.rs | 59 +++++++------- packages/fuel-indexer/src/executor.rs | 40 ++++++--- packages/fuel-indexer/src/ffi.rs | 86 ++++++++++++++++---- 8 files changed, 156 insertions(+), 76 deletions(-) diff --git a/packages/fuel-indexer-benchmarks/src/lib.rs b/packages/fuel-indexer-benchmarks/src/lib.rs index 22eafb10f..899341682 100644 --- a/packages/fuel-indexer-benchmarks/src/lib.rs +++ b/packages/fuel-indexer-benchmarks/src/lib.rs @@ -7,7 +7,7 @@ use fuel_indexer::{ use fuel_indexer_database::IndexerConnectionPool; use fuel_indexer_lib::config::DatabaseConfig; use fuel_indexer_tests::fixtures::TestPostgresDb; -use std::{str::FromStr, sync::atomic::AtomicBool}; +use std::{str::FromStr, sync::atomic::AtomicBool, sync::Arc}; /// Location of Fuel node to be used for block retrieval. pub const NODE_URL: &str = "beta-4.fuel.network:80"; @@ -40,6 +40,7 @@ async fn setup_wasm_executor( db_url: String, pool: IndexerConnectionPool, ) -> Result { + let kill_switch = Arc::new(AtomicBool::new(false)); config.database = DatabaseConfig::from_str(&db_url).unwrap(); let schema_version = manifest .graphql_schema_content() @@ -52,6 +53,7 @@ async fn setup_wasm_executor( manifest.module_bytes().unwrap(), pool, schema_version, + kill_switch, ) .await .expect("Could not setup WASM executor"); diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index 127d29dd0..82b9bf12e 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -44,6 +44,8 @@ pub enum WasmIndexerError { UnableToSaveListType, UninitializedMemory, UnableToFetchLogString, + KillSwitch, + DatabaseError, GeneralError, } @@ -57,6 +59,8 @@ impl From for WasmIndexerError { 4 => Self::UnableToSaveListType, 5 => Self::UninitializedMemory, 6 => Self::UnableToFetchLogString, + 7 => Self::KillSwitch, + 8 => Self::DatabaseError, _ => Self::GeneralError, } } @@ -83,6 +87,12 @@ impl std::fmt::Display for WasmIndexerError { Self::UnableToFetchLogString => { write!(f, "Failed to fetch log string") } + Self::KillSwitch => { + write!(f, "Kill switch has been triggered") + } + Self::DatabaseError => { + write!(f, "Failed performing a database operation") + } Self::GeneralError => write!(f, "A WASM error occurred"), } } diff --git a/packages/fuel-indexer-macros/src/decoder.rs b/packages/fuel-indexer-macros/src/decoder.rs index f360976be..88bcfdb81 100644 --- a/packages/fuel-indexer-macros/src/decoder.rs +++ b/packages/fuel-indexer-macros/src/decoder.rs @@ -833,7 +833,7 @@ impl From for TokenStream { .map(|query| query.to_string()) .collect::>(); - d.lock().await.put_many_to_many_record(queries).await; + d.lock().await.put_many_to_many_record(queries).await.unwrap(); } } None => {} @@ -868,7 +868,7 @@ impl From for TokenStream { Self::TYPE_ID, self.to_row(), serialize(&self.to_row()) - ).await; + ).await.unwrap(); } None => {}, } diff --git a/packages/fuel-indexer-plugin/src/wasm.rs b/packages/fuel-indexer-plugin/src/wasm.rs index 889dc1114..3fd726016 100644 --- a/packages/fuel-indexer-plugin/src/wasm.rs +++ b/packages/fuel-indexer-plugin/src/wasm.rs @@ -24,8 +24,8 @@ extern "C" { // log_data prints information to stdout. fn ff_log_data(ptr: *const u8, len: u32, log_level: u32); // Put methods have error codes. - fn ff_put_object(type_id: i64, ptr: *const u8, len: u32) -> i32; - fn ff_put_many_to_many_record(ptr: *const u8, len: u32) -> i32; + fn ff_put_object(type_id: i64, ptr: *const u8, len: u32); + fn ff_put_many_to_many_record(ptr: *const u8, len: u32); } // TODO: more to do here, hook up to 'impl log::Log for Logger' @@ -75,7 +75,7 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { } /// Saves a record that contains a list of multiple elements. - fn save_many_to_many(&self) -> Result<(), WasmIndexerError> { + fn save_many_to_many(&self) { if let Some(meta) = Self::JOIN_METADATA { let items = meta.iter().filter_map(|x| x.clone()).collect::>(); let row = self.to_row(); @@ -86,16 +86,8 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { .collect::>(); let bytes = serialize(&queries); unsafe { - let res = ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32); - - if res != 0 { - return Err(WasmIndexerError::UnableToSaveListType); - } - - Ok(()) + ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32); } - } else { - Ok(()) } } @@ -135,17 +127,14 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { /// Saves a record. fn save(&self) { - self.save_unsafe().unwrap() + self.save_unsafe() } /// Saves a record through the FFI with the WASM runtime and checks for errors. - fn save_unsafe(&self) -> Result<(), WasmIndexerError> { + fn save_unsafe(&self) { unsafe { let buf = serialize(&self.to_row()); - let res = ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32); - if res != 0 { - return Err(WasmIndexerError::from(res)); - } + ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32); } self.save_many_to_many() diff --git a/packages/fuel-indexer-tests/tests/service.rs b/packages/fuel-indexer-tests/tests/service.rs index 20a306aff..3dda8bdda 100644 --- a/packages/fuel-indexer-tests/tests/service.rs +++ b/packages/fuel-indexer-tests/tests/service.rs @@ -3,7 +3,7 @@ use fuel_indexer::{Executor, IndexerConfig, WasmIndexExecutor}; use fuel_indexer_lib::{config::DatabaseConfig, manifest::Manifest}; use fuel_indexer_tests::fixtures::TestPostgresDb; use std::str::FromStr; -use std::sync::atomic::AtomicBool; +use std::sync::{atomic::AtomicBool, Arc}; #[tokio::test] async fn test_wasm_executor_can_meter_execution() { @@ -48,12 +48,14 @@ async fn test_wasm_executor_can_meter_execution() { .version() .to_string(); + let kill_switch = Arc::new(AtomicBool::new(false)); let mut executor = WasmIndexExecutor::new( &config, &manifest, bytes.clone(), pool, schema_version, + kill_switch, ) .await .unwrap(); diff --git a/packages/fuel-indexer/src/database.rs b/packages/fuel-indexer/src/database.rs index 242d0deed..c0a6c91d7 100644 --- a/packages/fuel-indexer/src/database.rs +++ b/packages/fuel-indexer/src/database.rs @@ -64,29 +64,28 @@ impl Database { let conn = self.pool.acquire().await?; self.stashed = Some(conn); debug!("Connection stashed as: {:?}", self.stashed); - let conn = self.stashed.as_mut().expect( - "No stashed connection for start transaction. Was a transaction started?", - ); + let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( + "No stashed connection for start transaction. Was a transaction started?" + .to_string(), + ))?; let result = queries::start_transaction(conn).await?; Ok(result) } /// Commit transaction to database. pub async fn commit_transaction(&mut self) -> IndexerResult { - let conn = self - .stashed - .as_mut() - .expect("No stashed connection for commit. Was a transaction started?"); + let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( + "No stashed connection for commit. Was a transaction started?".to_string(), + ))?; let res = queries::commit_transaction(conn).await?; Ok(res) } /// Revert open transaction. pub async fn revert_transaction(&mut self) -> IndexerResult { - let conn = self - .stashed - .as_mut() - .expect("No stashed connection for revert. Was a transaction started?"); + let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( + "No stashed connection for revert. Was a transaction started?".to_string(), + ))?; let res = queries::revert_transaction(conn).await?; Ok(res) } @@ -132,11 +131,11 @@ impl Database { type_id: i64, columns: Vec, bytes: Vec, - ) { + ) -> IndexerResult<()> { let table = match self.tables.get(&type_id) { Some(t) => t, None => { - error!( + return Err(crate::IndexerError::Unknown(format!( r#"TypeId({type_id}) not found in tables: {:?}. Does the schema version in SchemaManager::new_schema match the schema version in Database::load_schema? @@ -145,8 +144,7 @@ Do your WASM modules need to be rebuilt? "#, self.tables, - ); - return; + ))); } }; @@ -162,18 +160,17 @@ Do your WASM modules need to be rebuilt? let query_text = format_sql_query(self.upsert_query(table, &columns, inserts, updates)); - let conn = self - .stashed - .as_mut() - .expect("No stashed connection for put. Was a transaction started?"); + let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( + "No stashed connection for put. Was a transaction started?".to_string(), + ))?; if self.config.verbose { info!("{query_text}"); } - if let Err(e) = queries::put_object(conn, query_text, bytes).await { - error!("Failed to put_object: {e:?}"); - } + queries::put_object(conn, query_text, bytes).await?; + + Ok(()) } /// Get an object from the database. @@ -261,20 +258,22 @@ Do your WASM modules need to be rebuilt? /// /// There are multiple queries here because a single parent `TypeDefinition` can have several /// many-to-many relationships with children `TypeDefinition`s. - pub async fn put_many_to_many_record(&mut self, queries: Vec) { - let conn = self - .stashed - .as_mut() - .expect("No stashed connection for put. Was a transaction started?"); + pub async fn put_many_to_many_record( + &mut self, + queries: Vec, + ) -> IndexerResult<()> { + let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( + "No stashed connection for put. Was a transaction started?".to_string(), + ))?; for query in queries { if self.config.verbose { info!("{query}"); } - if let Err(e) = queries::put_many_to_many_record(conn, query).await { - error!("Failed to put_many_to_many_record: {e:?}"); - } + queries::put_many_to_many_record(conn, query).await?; } + + Ok(()) } } diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 67267494d..914a28c61 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -16,7 +16,9 @@ use fuel_core_client::client::{ FuelClient, }; use fuel_indexer_database::IndexerConnectionPool; -use fuel_indexer_lib::{defaults::*, manifest::Manifest, utils::serialize}; +use fuel_indexer_lib::{ + defaults::*, manifest::Manifest, utils::serialize, WasmIndexerError, +}; use fuel_indexer_types::{ fuel::{field::*, *}, scalar::{Bytes32, HexString}, @@ -27,6 +29,7 @@ use fuel_vm::state::ProgramState as ClientProgramState; use futures::Future; use itertools::Itertools; use std::{ + error::Error, marker::{Send, Sync}, path::Path, str::FromStr, @@ -187,6 +190,12 @@ pub fn run_executor( .handle_events(kill_switch.clone(), block_info) .await; + // If the kill switch has been triggered, the executor exits early. + if kill_switch.load(Ordering::SeqCst) { + info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); + break; + } + if let Err(e) = result { // Run time metering is deterministic. There is no point in retrying. if let IndexerError::RunTimeLimitExceededError = e { @@ -228,12 +237,6 @@ pub fn run_executor( // If we make it this far, we always go to the next page. cursor = next_cursor; - // Again, check if something else has signaled that this indexer should stop, then stop. - if kill_switch.load(Ordering::SeqCst) { - info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); - break; - } - // Since we had successful call, we reset the retry count. consecutive_retries = 0; } @@ -501,6 +504,10 @@ pub struct IndexEnv { /// Reference to the connected database. pub db: Arc>, + + /// Kill switch for this indexer. When true, the indexer service indicated + /// that the indexer is being terminated. + pub kill_switch: Arc, } impl IndexEnv { @@ -509,6 +516,7 @@ impl IndexEnv { pool: IndexerConnectionPool, manifest: &Manifest, config: &IndexerConfig, + kill_switch: Arc, ) -> IndexerResult { let db = Database::new(pool, manifest, config).await; Ok(IndexEnv { @@ -516,6 +524,7 @@ impl IndexEnv { alloc: None, dealloc: None, db: Arc::new(Mutex::new(db)), + kill_switch, }) } } @@ -649,6 +658,7 @@ impl WasmIndexExecutor { wasm_bytes: impl AsRef<[u8]>, pool: IndexerConnectionPool, schema_version: String, + kill_switch: Arc, ) -> IndexerResult { let mut compiler_config = Cranelift::new(); @@ -661,7 +671,7 @@ impl WasmIndexExecutor { compiler_config.push_middleware(metering); } - let idx_env = IndexEnv::new(pool, manifest, config).await?; + let idx_env = IndexEnv::new(pool, manifest, config, kill_switch).await?; let db: Arc> = idx_env.db.clone(); @@ -730,12 +740,13 @@ impl WasmIndexExecutor { p: impl AsRef, config: Option, pool: IndexerConnectionPool, + kill_switch: Arc, ) -> IndexerResult { let config = config.unwrap_or_default(); let manifest = Manifest::from_file(p)?; let bytes = manifest.module_bytes()?; let schema_version = manifest.graphql_schema_content()?.version().to_string(); - Self::new(&config, &manifest, bytes, pool, schema_version).await + Self::new(&config, &manifest, bytes, pool, schema_version, kill_switch).await } /// Create a new `WasmIndexExecutor`. @@ -762,6 +773,7 @@ impl WasmIndexExecutor { bytes.clone(), pool, schema_version, + killer.clone(), ) .await { @@ -794,6 +806,7 @@ impl WasmIndexExecutor { bytes, pool, schema_version, + killer.clone(), ) .await { @@ -917,7 +930,14 @@ impl Executor for WasmIndexExecutor { self.db.lock().await.revert_transaction().await?; return Err(IndexerError::RunTimeLimitExceededError); } else { - error!("WasmIndexExecutor({uid}) WASM execution failed: {e:?}."); + if let Some(e) = e + .source() + .and_then(|e| e.downcast_ref::()) + { + error!("Indexer({uid}) WASM execution failed: {e}."); + } else { + error!("Indexer({uid}) WASM execution failed: {e:?}."); + }; self.db.lock().await.revert_transaction().await?; return Err(IndexerError::from(e)); } diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index 6dc880840..59a1121c7 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -70,8 +70,23 @@ fn get_object_id(mem: &MemoryView, ptr: u32, len: u32) -> FFIResult { } /// Log the string at the given pointer to stdout. -fn log_data(mut env: FunctionEnvMut, ptr: u32, len: u32, log_level: u32) { +fn log_data( + mut env: FunctionEnvMut, + ptr: u32, + len: u32, + log_level: u32, +) -> Result<(), WasmIndexerError> { let (idx_env, store) = env.data_and_store_mut(); + + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + let mem = idx_env .memory .as_mut() @@ -89,6 +104,8 @@ fn log_data(mut env: FunctionEnvMut, ptr: u32, len: u32, log_level: u3 LOG_LEVEL_TRACE => trace!("{log_string}",), l => panic!("Invalid log level: {l}"), } + + Ok(()) } /// Fetch the given type at the given pointer from memory. @@ -99,9 +116,18 @@ fn get_object( type_id: i64, ptr: u32, len_ptr: u32, -) -> u32 { +) -> Result { let (idx_env, mut store) = env.data_and_store_mut(); + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + let mem = idx_env .memory .as_mut() @@ -138,9 +164,9 @@ fn get_object( mem.data_unchecked_mut()[range].copy_from_slice(&bytes); } - result + Ok(result) } else { - 0 + Ok(0) } } @@ -152,13 +178,22 @@ fn put_object( type_id: i64, ptr: u32, len: u32, -) -> i32 { +) -> Result<(), WasmIndexerError> { let (idx_env, store) = env.data_and_store_mut(); + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + let mem = if let Some(memory) = idx_env.memory.as_mut() { memory.view(&store) } else { - return WasmIndexerError::UninitializedMemory as i32; + return Err(WasmIndexerError::UninitializedMemory); }; let mut bytes = Vec::with_capacity(len as usize); @@ -172,12 +207,12 @@ fn put_object( Ok(columns) => columns, Err(e) => { error!("Failed to deserialize Vec for put_object: {e:?}",); - return WasmIndexerError::DeserializationError as i32; + return Err(WasmIndexerError::DeserializationError); } }; let rt = tokio::runtime::Handle::current(); - rt.block_on(async { + let result = rt.block_on(async { idx_env .db .lock() @@ -186,19 +221,37 @@ fn put_object( .await }); - 0 + if let Err(e) = result { + error!("Failed to put_object: {e}"); + return Err(WasmIndexerError::DatabaseError); + }; + + Ok(()) } /// Execute the arbitrary query at the given pointer. /// /// This function is fallible, and will panic if the query cannot be executed. -fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32) -> i32 { +fn put_many_to_many_record( + mut env: FunctionEnvMut, + ptr: u32, + len: u32, +) -> Result<(), WasmIndexerError> { let (idx_env, store) = env.data_and_store_mut(); + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + let mem = if let Some(memory) = idx_env.memory.as_mut() { memory.view(&store) } else { - return WasmIndexerError::UninitializedMemory as i32; + return Err(WasmIndexerError::UninitializedMemory); }; let mut bytes = Vec::with_capacity(len as usize); @@ -212,12 +265,12 @@ fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32 Ok(queries) => queries.iter().map(|q| q.to_string()).collect(), Err(e) => { error!("Failed to deserialize queries: {e:?}"); - return WasmIndexerError::DeserializationError as i32; + return Err(WasmIndexerError::DeserializationError); } }; let rt = tokio::runtime::Handle::current(); - rt.block_on(async { + let result = rt.block_on(async { idx_env .db .lock() @@ -226,7 +279,12 @@ fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32 .await }); - 0 + if let Err(e) = result { + error!("Failed to put_many_to_many_record: {e:?}"); + return Err(WasmIndexerError::DatabaseError); + } + + Ok(()) } /// Get the exports for the given store and environment.