From 21e2e2f82fba7c9797526ec365e69b321b1bbadc Mon Sep 17 00:00:00 2001 From: Alexander Decurnou Date: Tue, 22 Aug 2023 10:21:26 -0400 Subject: [PATCH 01/20] Implement error codes for FFI WASM functions --- packages/fuel-indexer-lib/src/lib.rs | 54 ++++++++++++++++++ packages/fuel-indexer-plugin/src/wasm.rs | 72 ++++++++++++++++++++---- packages/fuel-indexer/src/ffi.rs | 50 ++++++++++------ 3 files changed, 148 insertions(+), 28 deletions(-) diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index cbc7ec340..127d29dd0 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -36,6 +36,60 @@ impl ExecutionSource { } } +#[derive(Debug, Clone, Copy)] +pub enum WasmIndexerError { + DeserializationError = 1, + SerializationError, + PutObjectError, + UnableToSaveListType, + UninitializedMemory, + UnableToFetchLogString, + GeneralError, +} + +impl From for WasmIndexerError { + fn from(value: i32) -> Self { + match value { + 0 => unreachable!("WasmIndexerError index starts at 1"), + 1 => Self::DeserializationError, + 2 => Self::SerializationError, + 3 => Self::PutObjectError, + 4 => Self::UnableToSaveListType, + 5 => Self::UninitializedMemory, + 6 => Self::UnableToFetchLogString, + _ => Self::GeneralError, + } + } +} + +impl std::fmt::Display for WasmIndexerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::SerializationError => { + write!(f, "Failed to serialize") + } + Self::DeserializationError => { + write!(f, "Failed to deserialize") + } + Self::UnableToSaveListType => { + write!(f, "Failed to save list") + } + Self::PutObjectError => { + write!(f, "Failed to save object") + } + Self::UninitializedMemory => { + write!(f, "Failed to create MemoryView for indexer") + } + Self::UnableToFetchLogString => { + write!(f, "Failed to fetch log string") + } + Self::GeneralError => write!(f, "A WASM error occurred"), + } + } +} + +impl std::error::Error for WasmIndexerError {} + /// Return a fully qualified indexer namespace. pub fn fully_qualified_namespace(namespace: &str, identifier: &str) -> String { format!("{}_{}", namespace, identifier) diff --git a/packages/fuel-indexer-plugin/src/wasm.rs b/packages/fuel-indexer-plugin/src/wasm.rs index 38c409a1f..889dc1114 100644 --- a/packages/fuel-indexer-plugin/src/wasm.rs +++ b/packages/fuel-indexer-plugin/src/wasm.rs @@ -4,6 +4,7 @@ use alloc::vec::Vec; use fuel_indexer_lib::{ graphql::MAX_FOREIGN_KEY_LIST_FIELDS, utils::{deserialize, serialize}, + WasmIndexerError, }; use fuel_indexer_schema::{ join::{JoinMetadata, RawQuery}, @@ -17,11 +18,14 @@ pub use sha2::{Digest, Sha256}; pub use std::collections::{HashMap, HashSet}; extern "C" { - // TODO: error codes? or just panic and let the runtime handle it? + // TODO: How do we want to return an error code for + // a function that returns a u32 but actually uses a u8? fn ff_get_object(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8; - fn ff_put_object(type_id: i64, ptr: *const u8, len: u32); - fn ff_put_many_to_many_record(ptr: *const u8, len: u32); + // log_data prints information to stdout. fn ff_log_data(ptr: *const u8, len: u32, log_level: u32); + // Put methods have error codes. + fn ff_put_object(type_id: i64, ptr: *const u8, len: u32) -> i32; + fn ff_put_many_to_many_record(ptr: *const u8, len: u32) -> i32; } // TODO: more to do here, hook up to 'impl log::Log for Logger' @@ -49,19 +53,29 @@ impl Logger { } } +/// Trait for a type entity. +/// +/// Any entity type that will be processed through a WASM indexer is required to implement this trait. pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { + /// Unique identifier for a type. const TYPE_ID: i64; + + /// Necessary metadata for saving an entity's list type fields. const JOIN_METADATA: Option<[Option>; MAX_FOREIGN_KEY_LIST_FIELDS]>; + /// Convert database row representation into an instance of an entity. fn from_row(vec: Vec) -> Self; + /// Convert an instance of an entity into row representation for use in a database. fn to_row(&self) -> Vec; + /// Returns an entity's internal type ID. fn type_id(&self) -> i64 { Self::TYPE_ID } - fn save_many_to_many(&self) { + /// Saves a record that contains a list of multiple elements. + fn save_many_to_many(&self) -> Result<(), WasmIndexerError> { if let Some(meta) = Self::JOIN_METADATA { let items = meta.iter().filter_map(|x| x.clone()).collect::>(); let row = self.to_row(); @@ -71,13 +85,34 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { .filter(|query| !query.is_empty()) .collect::>(); let bytes = serialize(&queries); - unsafe { ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32) } + unsafe { + let res = ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32); + + if res != 0 { + return Err(WasmIndexerError::UnableToSaveListType); + } + + Ok(()) + } + } else { + Ok(()) } } + /// Loads a record given a UID. fn load(id: UID) -> Option { + Self::load_unsafe(id).unwrap() + } + + /// Loads a record through the FFI with the WASM runtime and checks for errors. + fn load_unsafe(id: UID) -> Result, WasmIndexerError> { unsafe { - let buff = bincode::serialize(&id.to_string()).unwrap(); + let buff = if let Ok(bytes) = bincode::serialize(&id.to_string()) { + bytes + } else { + return Err(WasmIndexerError::SerializationError); + }; + let mut bufflen = (buff.len() as u32).to_le_bytes(); let ptr = ff_get_object(Self::TYPE_ID, buff.as_ptr(), bufflen.as_mut_ptr()); @@ -85,26 +120,40 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { if !ptr.is_null() { let len = u32::from_le_bytes(bufflen) as usize; let bytes = Vec::from_raw_parts(ptr, len, len); - let vec = deserialize(&bytes).expect("Bad serialization."); + let vec = if let Ok(v) = deserialize(&bytes) { + v + } else { + return Err(WasmIndexerError::DeserializationError); + }; - return Some(Self::from_row(vec)); + return Ok(Some(Self::from_row(vec))); } - None + Ok(None) } } + /// Saves a record. fn save(&self) { + self.save_unsafe().unwrap() + } + + /// Saves a record through the FFI with the WASM runtime and checks for errors. + fn save_unsafe(&self) -> Result<(), WasmIndexerError> { unsafe { let buf = serialize(&self.to_row()); - ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32) + let res = ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32); + if res != 0 { + return Err(WasmIndexerError::from(res)); + } } - self.save_many_to_many(); + self.save_many_to_many() } } #[no_mangle] +/// Allocation function to be called by an executor in a WASM runtime. fn alloc_fn(size: u32) -> *const u8 { let vec = Vec::with_capacity(size as usize); let ptr = vec.as_ptr(); @@ -115,6 +164,7 @@ fn alloc_fn(size: u32) -> *const u8 { } #[no_mangle] +/// Deallocation function to be called by an executor in a WASM runtime. fn dealloc_fn(ptr: *mut u8, len: usize) { let _vec = unsafe { Vec::from_raw_parts(ptr, len, len) }; } diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index 0beecd840..6dc880840 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -1,5 +1,5 @@ use async_std::sync::MutexGuard; -use fuel_indexer_lib::defaults; +use fuel_indexer_lib::{defaults, WasmIndexerError}; use fuel_indexer_schema::{join::RawQuery, FtColumn}; use fuel_indexer_types::ffi::{ LOG_LEVEL_DEBUG, LOG_LEVEL_ERROR, LOG_LEVEL_INFO, LOG_LEVEL_TRACE, LOG_LEVEL_WARN, @@ -147,13 +147,19 @@ fn get_object( /// Put the given type at the given pointer into memory. /// /// This function is fallible, and will panic if the type cannot be saved. -fn put_object(mut env: FunctionEnvMut, type_id: i64, ptr: u32, len: u32) { +fn put_object( + mut env: FunctionEnvMut, + type_id: i64, + ptr: u32, + len: u32, +) -> i32 { let (idx_env, store) = env.data_and_store_mut(); - let mem = idx_env - .memory - .as_mut() - .expect("Memory unitialized") - .view(&store); + + let mem = if let Some(memory) = idx_env.memory.as_mut() { + memory.view(&store) + } else { + return WasmIndexerError::UninitializedMemory as i32; + }; let mut bytes = Vec::with_capacity(len as usize); let range = ptr as usize..ptr as usize + len as usize; @@ -166,7 +172,7 @@ fn put_object(mut env: FunctionEnvMut, type_id: i64, ptr: u32, len: u3 Ok(columns) => columns, Err(e) => { error!("Failed to deserialize Vec for put_object: {e:?}",); - return; + return WasmIndexerError::DeserializationError as i32; } }; @@ -179,18 +185,21 @@ fn put_object(mut env: FunctionEnvMut, type_id: i64, ptr: u32, len: u3 .put_object(type_id, columns, bytes) .await }); + + 0 } /// Execute the arbitrary query at the given pointer. /// /// This function is fallible, and will panic if the query cannot be executed. -fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32) { +fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32) -> i32 { let (idx_env, store) = env.data_and_store_mut(); - let mem = idx_env - .memory - .as_mut() - .expect("Memory unitialized") - .view(&store); + + let mem = if let Some(memory) = idx_env.memory.as_mut() { + memory.view(&store) + } else { + return WasmIndexerError::UninitializedMemory as i32; + }; let mut bytes = Vec::with_capacity(len as usize); let range = ptr as usize..ptr as usize + len as usize; @@ -199,9 +208,14 @@ fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32 bytes.extend_from_slice(&mem.data_unchecked()[range]); } - let queries: Vec = - bincode::deserialize(&bytes).expect("Failed to deserialize queries"); - let queries = queries.iter().map(|q| q.to_string()).collect::>(); + let queries: Vec = match bincode::deserialize::>(&bytes) { + Ok(queries) => queries.iter().map(|q| q.to_string()).collect(), + Err(e) => { + error!("Failed to deserialize queries: {e:?}"); + return WasmIndexerError::DeserializationError as i32; + } + }; + let rt = tokio::runtime::Handle::current(); rt.block_on(async { idx_env @@ -211,6 +225,8 @@ fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32 .put_many_to_many_record(queries) .await }); + + 0 } /// Get the exports for the given store and environment. From e74645e0d418db7834e8a271ad50edcaaf055624 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Wed, 6 Sep 2023 16:57:05 +0200 Subject: [PATCH 02/20] early exit and kill switch --- packages/fuel-indexer-benchmarks/src/lib.rs | 4 +- packages/fuel-indexer-lib/src/lib.rs | 10 +++ packages/fuel-indexer-macros/src/decoder.rs | 4 +- packages/fuel-indexer-plugin/src/wasm.rs | 25 ++---- packages/fuel-indexer-tests/tests/service.rs | 4 +- packages/fuel-indexer/src/database.rs | 59 +++++++------- packages/fuel-indexer/src/executor.rs | 40 ++++++--- packages/fuel-indexer/src/ffi.rs | 86 ++++++++++++++++---- 8 files changed, 156 insertions(+), 76 deletions(-) diff --git a/packages/fuel-indexer-benchmarks/src/lib.rs b/packages/fuel-indexer-benchmarks/src/lib.rs index 22eafb10f..899341682 100644 --- a/packages/fuel-indexer-benchmarks/src/lib.rs +++ b/packages/fuel-indexer-benchmarks/src/lib.rs @@ -7,7 +7,7 @@ use fuel_indexer::{ use fuel_indexer_database::IndexerConnectionPool; use fuel_indexer_lib::config::DatabaseConfig; use fuel_indexer_tests::fixtures::TestPostgresDb; -use std::{str::FromStr, sync::atomic::AtomicBool}; +use std::{str::FromStr, sync::atomic::AtomicBool, sync::Arc}; /// Location of Fuel node to be used for block retrieval. pub const NODE_URL: &str = "beta-4.fuel.network:80"; @@ -40,6 +40,7 @@ async fn setup_wasm_executor( db_url: String, pool: IndexerConnectionPool, ) -> Result { + let kill_switch = Arc::new(AtomicBool::new(false)); config.database = DatabaseConfig::from_str(&db_url).unwrap(); let schema_version = manifest .graphql_schema_content() @@ -52,6 +53,7 @@ async fn setup_wasm_executor( manifest.module_bytes().unwrap(), pool, schema_version, + kill_switch, ) .await .expect("Could not setup WASM executor"); diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index 127d29dd0..82b9bf12e 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -44,6 +44,8 @@ pub enum WasmIndexerError { UnableToSaveListType, UninitializedMemory, UnableToFetchLogString, + KillSwitch, + DatabaseError, GeneralError, } @@ -57,6 +59,8 @@ impl From for WasmIndexerError { 4 => Self::UnableToSaveListType, 5 => Self::UninitializedMemory, 6 => Self::UnableToFetchLogString, + 7 => Self::KillSwitch, + 8 => Self::DatabaseError, _ => Self::GeneralError, } } @@ -83,6 +87,12 @@ impl std::fmt::Display for WasmIndexerError { Self::UnableToFetchLogString => { write!(f, "Failed to fetch log string") } + Self::KillSwitch => { + write!(f, "Kill switch has been triggered") + } + Self::DatabaseError => { + write!(f, "Failed performing a database operation") + } Self::GeneralError => write!(f, "A WASM error occurred"), } } diff --git a/packages/fuel-indexer-macros/src/decoder.rs b/packages/fuel-indexer-macros/src/decoder.rs index f360976be..88bcfdb81 100644 --- a/packages/fuel-indexer-macros/src/decoder.rs +++ b/packages/fuel-indexer-macros/src/decoder.rs @@ -833,7 +833,7 @@ impl From for TokenStream { .map(|query| query.to_string()) .collect::>(); - d.lock().await.put_many_to_many_record(queries).await; + d.lock().await.put_many_to_many_record(queries).await.unwrap(); } } None => {} @@ -868,7 +868,7 @@ impl From for TokenStream { Self::TYPE_ID, self.to_row(), serialize(&self.to_row()) - ).await; + ).await.unwrap(); } None => {}, } diff --git a/packages/fuel-indexer-plugin/src/wasm.rs b/packages/fuel-indexer-plugin/src/wasm.rs index 889dc1114..3fd726016 100644 --- a/packages/fuel-indexer-plugin/src/wasm.rs +++ b/packages/fuel-indexer-plugin/src/wasm.rs @@ -24,8 +24,8 @@ extern "C" { // log_data prints information to stdout. fn ff_log_data(ptr: *const u8, len: u32, log_level: u32); // Put methods have error codes. - fn ff_put_object(type_id: i64, ptr: *const u8, len: u32) -> i32; - fn ff_put_many_to_many_record(ptr: *const u8, len: u32) -> i32; + fn ff_put_object(type_id: i64, ptr: *const u8, len: u32); + fn ff_put_many_to_many_record(ptr: *const u8, len: u32); } // TODO: more to do here, hook up to 'impl log::Log for Logger' @@ -75,7 +75,7 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { } /// Saves a record that contains a list of multiple elements. - fn save_many_to_many(&self) -> Result<(), WasmIndexerError> { + fn save_many_to_many(&self) { if let Some(meta) = Self::JOIN_METADATA { let items = meta.iter().filter_map(|x| x.clone()).collect::>(); let row = self.to_row(); @@ -86,16 +86,8 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { .collect::>(); let bytes = serialize(&queries); unsafe { - let res = ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32); - - if res != 0 { - return Err(WasmIndexerError::UnableToSaveListType); - } - - Ok(()) + ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32); } - } else { - Ok(()) } } @@ -135,17 +127,14 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { /// Saves a record. fn save(&self) { - self.save_unsafe().unwrap() + self.save_unsafe() } /// Saves a record through the FFI with the WASM runtime and checks for errors. - fn save_unsafe(&self) -> Result<(), WasmIndexerError> { + fn save_unsafe(&self) { unsafe { let buf = serialize(&self.to_row()); - let res = ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32); - if res != 0 { - return Err(WasmIndexerError::from(res)); - } + ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32); } self.save_many_to_many() diff --git a/packages/fuel-indexer-tests/tests/service.rs b/packages/fuel-indexer-tests/tests/service.rs index 20a306aff..3dda8bdda 100644 --- a/packages/fuel-indexer-tests/tests/service.rs +++ b/packages/fuel-indexer-tests/tests/service.rs @@ -3,7 +3,7 @@ use fuel_indexer::{Executor, IndexerConfig, WasmIndexExecutor}; use fuel_indexer_lib::{config::DatabaseConfig, manifest::Manifest}; use fuel_indexer_tests::fixtures::TestPostgresDb; use std::str::FromStr; -use std::sync::atomic::AtomicBool; +use std::sync::{atomic::AtomicBool, Arc}; #[tokio::test] async fn test_wasm_executor_can_meter_execution() { @@ -48,12 +48,14 @@ async fn test_wasm_executor_can_meter_execution() { .version() .to_string(); + let kill_switch = Arc::new(AtomicBool::new(false)); let mut executor = WasmIndexExecutor::new( &config, &manifest, bytes.clone(), pool, schema_version, + kill_switch, ) .await .unwrap(); diff --git a/packages/fuel-indexer/src/database.rs b/packages/fuel-indexer/src/database.rs index 242d0deed..c0a6c91d7 100644 --- a/packages/fuel-indexer/src/database.rs +++ b/packages/fuel-indexer/src/database.rs @@ -64,29 +64,28 @@ impl Database { let conn = self.pool.acquire().await?; self.stashed = Some(conn); debug!("Connection stashed as: {:?}", self.stashed); - let conn = self.stashed.as_mut().expect( - "No stashed connection for start transaction. Was a transaction started?", - ); + let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( + "No stashed connection for start transaction. Was a transaction started?" + .to_string(), + ))?; let result = queries::start_transaction(conn).await?; Ok(result) } /// Commit transaction to database. pub async fn commit_transaction(&mut self) -> IndexerResult { - let conn = self - .stashed - .as_mut() - .expect("No stashed connection for commit. Was a transaction started?"); + let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( + "No stashed connection for commit. Was a transaction started?".to_string(), + ))?; let res = queries::commit_transaction(conn).await?; Ok(res) } /// Revert open transaction. pub async fn revert_transaction(&mut self) -> IndexerResult { - let conn = self - .stashed - .as_mut() - .expect("No stashed connection for revert. Was a transaction started?"); + let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( + "No stashed connection for revert. Was a transaction started?".to_string(), + ))?; let res = queries::revert_transaction(conn).await?; Ok(res) } @@ -132,11 +131,11 @@ impl Database { type_id: i64, columns: Vec, bytes: Vec, - ) { + ) -> IndexerResult<()> { let table = match self.tables.get(&type_id) { Some(t) => t, None => { - error!( + return Err(crate::IndexerError::Unknown(format!( r#"TypeId({type_id}) not found in tables: {:?}. Does the schema version in SchemaManager::new_schema match the schema version in Database::load_schema? @@ -145,8 +144,7 @@ Do your WASM modules need to be rebuilt? "#, self.tables, - ); - return; + ))); } }; @@ -162,18 +160,17 @@ Do your WASM modules need to be rebuilt? let query_text = format_sql_query(self.upsert_query(table, &columns, inserts, updates)); - let conn = self - .stashed - .as_mut() - .expect("No stashed connection for put. Was a transaction started?"); + let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( + "No stashed connection for put. Was a transaction started?".to_string(), + ))?; if self.config.verbose { info!("{query_text}"); } - if let Err(e) = queries::put_object(conn, query_text, bytes).await { - error!("Failed to put_object: {e:?}"); - } + queries::put_object(conn, query_text, bytes).await?; + + Ok(()) } /// Get an object from the database. @@ -261,20 +258,22 @@ Do your WASM modules need to be rebuilt? /// /// There are multiple queries here because a single parent `TypeDefinition` can have several /// many-to-many relationships with children `TypeDefinition`s. - pub async fn put_many_to_many_record(&mut self, queries: Vec) { - let conn = self - .stashed - .as_mut() - .expect("No stashed connection for put. Was a transaction started?"); + pub async fn put_many_to_many_record( + &mut self, + queries: Vec, + ) -> IndexerResult<()> { + let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( + "No stashed connection for put. Was a transaction started?".to_string(), + ))?; for query in queries { if self.config.verbose { info!("{query}"); } - if let Err(e) = queries::put_many_to_many_record(conn, query).await { - error!("Failed to put_many_to_many_record: {e:?}"); - } + queries::put_many_to_many_record(conn, query).await?; } + + Ok(()) } } diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 67267494d..914a28c61 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -16,7 +16,9 @@ use fuel_core_client::client::{ FuelClient, }; use fuel_indexer_database::IndexerConnectionPool; -use fuel_indexer_lib::{defaults::*, manifest::Manifest, utils::serialize}; +use fuel_indexer_lib::{ + defaults::*, manifest::Manifest, utils::serialize, WasmIndexerError, +}; use fuel_indexer_types::{ fuel::{field::*, *}, scalar::{Bytes32, HexString}, @@ -27,6 +29,7 @@ use fuel_vm::state::ProgramState as ClientProgramState; use futures::Future; use itertools::Itertools; use std::{ + error::Error, marker::{Send, Sync}, path::Path, str::FromStr, @@ -187,6 +190,12 @@ pub fn run_executor( .handle_events(kill_switch.clone(), block_info) .await; + // If the kill switch has been triggered, the executor exits early. + if kill_switch.load(Ordering::SeqCst) { + info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); + break; + } + if let Err(e) = result { // Run time metering is deterministic. There is no point in retrying. if let IndexerError::RunTimeLimitExceededError = e { @@ -228,12 +237,6 @@ pub fn run_executor( // If we make it this far, we always go to the next page. cursor = next_cursor; - // Again, check if something else has signaled that this indexer should stop, then stop. - if kill_switch.load(Ordering::SeqCst) { - info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); - break; - } - // Since we had successful call, we reset the retry count. consecutive_retries = 0; } @@ -501,6 +504,10 @@ pub struct IndexEnv { /// Reference to the connected database. pub db: Arc>, + + /// Kill switch for this indexer. When true, the indexer service indicated + /// that the indexer is being terminated. + pub kill_switch: Arc, } impl IndexEnv { @@ -509,6 +516,7 @@ impl IndexEnv { pool: IndexerConnectionPool, manifest: &Manifest, config: &IndexerConfig, + kill_switch: Arc, ) -> IndexerResult { let db = Database::new(pool, manifest, config).await; Ok(IndexEnv { @@ -516,6 +524,7 @@ impl IndexEnv { alloc: None, dealloc: None, db: Arc::new(Mutex::new(db)), + kill_switch, }) } } @@ -649,6 +658,7 @@ impl WasmIndexExecutor { wasm_bytes: impl AsRef<[u8]>, pool: IndexerConnectionPool, schema_version: String, + kill_switch: Arc, ) -> IndexerResult { let mut compiler_config = Cranelift::new(); @@ -661,7 +671,7 @@ impl WasmIndexExecutor { compiler_config.push_middleware(metering); } - let idx_env = IndexEnv::new(pool, manifest, config).await?; + let idx_env = IndexEnv::new(pool, manifest, config, kill_switch).await?; let db: Arc> = idx_env.db.clone(); @@ -730,12 +740,13 @@ impl WasmIndexExecutor { p: impl AsRef, config: Option, pool: IndexerConnectionPool, + kill_switch: Arc, ) -> IndexerResult { let config = config.unwrap_or_default(); let manifest = Manifest::from_file(p)?; let bytes = manifest.module_bytes()?; let schema_version = manifest.graphql_schema_content()?.version().to_string(); - Self::new(&config, &manifest, bytes, pool, schema_version).await + Self::new(&config, &manifest, bytes, pool, schema_version, kill_switch).await } /// Create a new `WasmIndexExecutor`. @@ -762,6 +773,7 @@ impl WasmIndexExecutor { bytes.clone(), pool, schema_version, + killer.clone(), ) .await { @@ -794,6 +806,7 @@ impl WasmIndexExecutor { bytes, pool, schema_version, + killer.clone(), ) .await { @@ -917,7 +930,14 @@ impl Executor for WasmIndexExecutor { self.db.lock().await.revert_transaction().await?; return Err(IndexerError::RunTimeLimitExceededError); } else { - error!("WasmIndexExecutor({uid}) WASM execution failed: {e:?}."); + if let Some(e) = e + .source() + .and_then(|e| e.downcast_ref::()) + { + error!("Indexer({uid}) WASM execution failed: {e}."); + } else { + error!("Indexer({uid}) WASM execution failed: {e:?}."); + }; self.db.lock().await.revert_transaction().await?; return Err(IndexerError::from(e)); } diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index 6dc880840..59a1121c7 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -70,8 +70,23 @@ fn get_object_id(mem: &MemoryView, ptr: u32, len: u32) -> FFIResult { } /// Log the string at the given pointer to stdout. -fn log_data(mut env: FunctionEnvMut, ptr: u32, len: u32, log_level: u32) { +fn log_data( + mut env: FunctionEnvMut, + ptr: u32, + len: u32, + log_level: u32, +) -> Result<(), WasmIndexerError> { let (idx_env, store) = env.data_and_store_mut(); + + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + let mem = idx_env .memory .as_mut() @@ -89,6 +104,8 @@ fn log_data(mut env: FunctionEnvMut, ptr: u32, len: u32, log_level: u3 LOG_LEVEL_TRACE => trace!("{log_string}",), l => panic!("Invalid log level: {l}"), } + + Ok(()) } /// Fetch the given type at the given pointer from memory. @@ -99,9 +116,18 @@ fn get_object( type_id: i64, ptr: u32, len_ptr: u32, -) -> u32 { +) -> Result { let (idx_env, mut store) = env.data_and_store_mut(); + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + let mem = idx_env .memory .as_mut() @@ -138,9 +164,9 @@ fn get_object( mem.data_unchecked_mut()[range].copy_from_slice(&bytes); } - result + Ok(result) } else { - 0 + Ok(0) } } @@ -152,13 +178,22 @@ fn put_object( type_id: i64, ptr: u32, len: u32, -) -> i32 { +) -> Result<(), WasmIndexerError> { let (idx_env, store) = env.data_and_store_mut(); + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + let mem = if let Some(memory) = idx_env.memory.as_mut() { memory.view(&store) } else { - return WasmIndexerError::UninitializedMemory as i32; + return Err(WasmIndexerError::UninitializedMemory); }; let mut bytes = Vec::with_capacity(len as usize); @@ -172,12 +207,12 @@ fn put_object( Ok(columns) => columns, Err(e) => { error!("Failed to deserialize Vec for put_object: {e:?}",); - return WasmIndexerError::DeserializationError as i32; + return Err(WasmIndexerError::DeserializationError); } }; let rt = tokio::runtime::Handle::current(); - rt.block_on(async { + let result = rt.block_on(async { idx_env .db .lock() @@ -186,19 +221,37 @@ fn put_object( .await }); - 0 + if let Err(e) = result { + error!("Failed to put_object: {e}"); + return Err(WasmIndexerError::DatabaseError); + }; + + Ok(()) } /// Execute the arbitrary query at the given pointer. /// /// This function is fallible, and will panic if the query cannot be executed. -fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32) -> i32 { +fn put_many_to_many_record( + mut env: FunctionEnvMut, + ptr: u32, + len: u32, +) -> Result<(), WasmIndexerError> { let (idx_env, store) = env.data_and_store_mut(); + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + let mem = if let Some(memory) = idx_env.memory.as_mut() { memory.view(&store) } else { - return WasmIndexerError::UninitializedMemory as i32; + return Err(WasmIndexerError::UninitializedMemory); }; let mut bytes = Vec::with_capacity(len as usize); @@ -212,12 +265,12 @@ fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32 Ok(queries) => queries.iter().map(|q| q.to_string()).collect(), Err(e) => { error!("Failed to deserialize queries: {e:?}"); - return WasmIndexerError::DeserializationError as i32; + return Err(WasmIndexerError::DeserializationError); } }; let rt = tokio::runtime::Handle::current(); - rt.block_on(async { + let result = rt.block_on(async { idx_env .db .lock() @@ -226,7 +279,12 @@ fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32 .await }); - 0 + if let Err(e) = result { + error!("Failed to put_many_to_many_record: {e:?}"); + return Err(WasmIndexerError::DatabaseError); + } + + Ok(()) } /// Get the exports for the given store and environment. From 92e0139d245760c5266db64aa80658fba932942a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Mon, 11 Sep 2023 18:29:05 +0200 Subject: [PATCH 03/20] review feedback --- packages/fuel-indexer-api-server/src/uses.rs | 6 +++ packages/fuel-indexer-macros/src/decoder.rs | 11 ++-- packages/fuel-indexer-plugin/src/wasm.rs | 6 +-- packages/fuel-indexer/src/database.rs | 57 ++++++++++++-------- packages/fuel-indexer/src/executor.rs | 10 +++- packages/fuel-indexer/src/ffi.rs | 8 ++- packages/fuel-indexer/src/lib.rs | 4 +- 7 files changed, 66 insertions(+), 36 deletions(-) diff --git a/packages/fuel-indexer-api-server/src/uses.rs b/packages/fuel-indexer-api-server/src/uses.rs index d1b25e79b..ec46aaec7 100644 --- a/packages/fuel-indexer-api-server/src/uses.rs +++ b/packages/fuel-indexer-api-server/src/uses.rs @@ -180,6 +180,12 @@ pub(crate) async fn remove_indexer( })) .await?; + // We have early termination on a kill switch. Yet, it is still possible for + // the database entries to be removed before the indexer has time to act on + // kill switch being triggered. Adding a small delay here should prevent + // unnecessary DatabaseError's appearing in the logs. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // Allways remove data when removing an indexer. if let Err(e) = queries::remove_indexer(&mut conn, &namespace, &identifier, true).await diff --git a/packages/fuel-indexer-macros/src/decoder.rs b/packages/fuel-indexer-macros/src/decoder.rs index 88bcfdb81..d875e3f80 100644 --- a/packages/fuel-indexer-macros/src/decoder.rs +++ b/packages/fuel-indexer-macros/src/decoder.rs @@ -833,7 +833,7 @@ impl From for TokenStream { .map(|query| query.to_string()) .collect::>(); - d.lock().await.put_many_to_many_record(queries).await.unwrap(); + d.lock().await.put_many_to_many_record(queries).await.expect(&format!("Entity::save_many_to_many for {} failed.", stringify!(#ident))); } } None => {} @@ -846,12 +846,15 @@ impl From for TokenStream { match &db { Some(d) => { match d.lock().await.get_object(Self::TYPE_ID, id.to_string()).await { - Some(bytes) => { + Ok(Some(bytes)) => { let columns: Vec = bincode::deserialize(&bytes).expect("Failed to deserialize Vec for Entity::load."); let obj = Self::from_row(columns); Some(obj) }, - None => None, + Ok(None) => None, + Err(e) => { + panic!("Entity::load for {} failed.", stringify!(#ident)) + } } } None => None, @@ -868,7 +871,7 @@ impl From for TokenStream { Self::TYPE_ID, self.to_row(), serialize(&self.to_row()) - ).await.unwrap(); + ).await.expect(&format!("Entity::save for {} failed.", stringify!(#ident))); } None => {}, } diff --git a/packages/fuel-indexer-plugin/src/wasm.rs b/packages/fuel-indexer-plugin/src/wasm.rs index 3fd726016..2d2829cf0 100644 --- a/packages/fuel-indexer-plugin/src/wasm.rs +++ b/packages/fuel-indexer-plugin/src/wasm.rs @@ -17,13 +17,11 @@ pub use hex::FromHex; pub use sha2::{Digest, Sha256}; pub use std::collections::{HashMap, HashSet}; +// All these methods have return type `Result`. `wasmer` +// uses the Err variant for ealy exit. extern "C" { - // TODO: How do we want to return an error code for - // a function that returns a u32 but actually uses a u8? fn ff_get_object(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8; - // log_data prints information to stdout. fn ff_log_data(ptr: *const u8, len: u32, log_level: u32); - // Put methods have error codes. fn ff_put_object(type_id: i64, ptr: *const u8, len: u32); fn ff_put_many_to_many_record(ptr: *const u8, len: u32); } diff --git a/packages/fuel-indexer/src/database.rs b/packages/fuel-indexer/src/database.rs index c0a6c91d7..50a92551a 100644 --- a/packages/fuel-indexer/src/database.rs +++ b/packages/fuel-indexer/src/database.rs @@ -1,4 +1,4 @@ -use crate::{IndexerConfig, IndexerResult, Manifest}; +use crate::{IndexerConfig, IndexerError, IndexerResult, Manifest}; use fuel_indexer_database::{queries, IndexerConnection, IndexerConnectionPool}; use fuel_indexer_lib::{ fully_qualified_namespace, graphql::types::IdCol, utils::format_sql_query, @@ -64,28 +64,36 @@ impl Database { let conn = self.pool.acquire().await?; self.stashed = Some(conn); debug!("Connection stashed as: {:?}", self.stashed); - let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( - "No stashed connection for start transaction. Was a transaction started?" - .to_string(), - ))?; + let conn = + self.stashed + .as_mut() + .ok_or(crate::IndexerError::NoTransactionError( + "start_transaction".to_string(), + ))?; let result = queries::start_transaction(conn).await?; Ok(result) } /// Commit transaction to database. pub async fn commit_transaction(&mut self) -> IndexerResult { - let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( - "No stashed connection for commit. Was a transaction started?".to_string(), - ))?; + let conn = + self.stashed + .as_mut() + .ok_or(crate::IndexerError::NoTransactionError( + "commit_transaction".to_string(), + ))?; let res = queries::commit_transaction(conn).await?; Ok(res) } /// Revert open transaction. pub async fn revert_transaction(&mut self) -> IndexerResult { - let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( - "No stashed connection for revert. Was a transaction started?".to_string(), - ))?; + let conn = + self.stashed + .as_mut() + .ok_or(crate::IndexerError::NoTransactionError( + "revert_transaction".to_string(), + ))?; let res = queries::revert_transaction(conn).await?; Ok(res) } @@ -135,7 +143,7 @@ impl Database { let table = match self.tables.get(&type_id) { Some(t) => t, None => { - return Err(crate::IndexerError::Unknown(format!( + return Err(IndexerError::Unknown(format!( r#"TypeId({type_id}) not found in tables: {:?}. Does the schema version in SchemaManager::new_schema match the schema version in Database::load_schema? @@ -160,9 +168,10 @@ Do your WASM modules need to be rebuilt? let query_text = format_sql_query(self.upsert_query(table, &columns, inserts, updates)); - let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( - "No stashed connection for put. Was a transaction started?".to_string(), - ))?; + let conn = self + .stashed + .as_mut() + .ok_or(IndexerError::NoTransactionError("put_object".to_string()))?; if self.config.verbose { info!("{query_text}"); @@ -178,23 +187,22 @@ Do your WASM modules need to be rebuilt? &mut self, type_id: i64, object_id: String, - ) -> Option> { + ) -> IndexerResult>> { let table = &self.tables[&type_id]; let query = self.get_query(table, &object_id); let conn = self .stashed .as_mut() - .expect("No stashed connection for get. Was a transaction started?"); - + .ok_or(IndexerError::NoTransactionError("get_object".to_string()))?; match queries::get_object(conn, query).await { - Ok(v) => Some(v), + Ok(v) => Ok(Some(v)), Err(e) => { if let sqlx::Error::RowNotFound = e { debug!("Row not found for object ID: {object_id}"); } else { error!("Failed to get_object: {e:?}"); } - None + Ok(None) } } } @@ -262,9 +270,12 @@ Do your WASM modules need to be rebuilt? &mut self, queries: Vec, ) -> IndexerResult<()> { - let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( - "No stashed connection for put. Was a transaction started?".to_string(), - ))?; + let conn = self + .stashed + .as_mut() + .ok_or(IndexerError::NoTransactionError( + "put_many_to_many_record".to_string(), + ))?; for query in queries { if self.config.verbose { diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 914a28c61..250b3a6cd 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -934,7 +934,15 @@ impl Executor for WasmIndexExecutor { .source() .and_then(|e| e.downcast_ref::()) { - error!("Indexer({uid}) WASM execution failed: {e}."); + match e { + // Termination due to kill switch is an expected behavior. + WasmIndexerError::KillSwitch => { + info!("Indexer({uid}) WASM execution terminated: {e}.") + } + _ => { + error!("Indexer({uid}) WASM execution failed: {e}.") + } + } } else { error!("Indexer({uid}) WASM execution failed: {e:?}."); }; diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index 59a1121c7..ee57ad58e 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -140,8 +140,12 @@ fn get_object( let id = get_object_id(&mem, ptr + offset, len + padding + offset).unwrap(); let rt = tokio::runtime::Handle::current(); - let bytes = - rt.block_on(async { idx_env.db.lock().await.get_object(type_id, id).await }); + let bytes = rt + .block_on(async { idx_env.db.lock().await.get_object(type_id, id).await }) + .map_err(|e| { + error!("Failed to get_object: {e}"); + WasmIndexerError::DatabaseError + })?; if let Some(bytes) = bytes { let alloc_fn = idx_env.alloc.as_mut().expect("Alloc export is missing."); diff --git a/packages/fuel-indexer/src/lib.rs b/packages/fuel-indexer/src/lib.rs index bb1554f7d..d02050b8a 100644 --- a/packages/fuel-indexer/src/lib.rs +++ b/packages/fuel-indexer/src/lib.rs @@ -68,8 +68,8 @@ pub enum IndexerError { HandlerError, #[error("Invalid port {0:?}")] InvalidPortNumber(#[from] core::num::ParseIntError), - #[error("No transaction is open.")] - NoTransactionError, + #[error("No open transaction for {0}. Was a transaction started?")] + NoTransactionError(String), #[error("{0}.")] Unknown(String), #[error("Indexer schema error: {0:?}")] From 1b2318d23ee4046becf0681cf3662c6d74bd532e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Tue, 12 Sep 2023 10:34:26 +0200 Subject: [PATCH 04/20] update comment --- packages/fuel-indexer-plugin/src/wasm.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/fuel-indexer-plugin/src/wasm.rs b/packages/fuel-indexer-plugin/src/wasm.rs index 2d2829cf0..c246fab6d 100644 --- a/packages/fuel-indexer-plugin/src/wasm.rs +++ b/packages/fuel-indexer-plugin/src/wasm.rs @@ -17,8 +17,9 @@ pub use hex::FromHex; pub use sha2::{Digest, Sha256}; pub use std::collections::{HashMap, HashSet}; -// All these methods have return type `Result`. `wasmer` -// uses the Err variant for ealy exit. +// These are instantiated with functions which return +// `Result`. `wasmer` unwraps the `Result` and uses the +// `Err` variant for ealy exit. extern "C" { fn ff_get_object(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8; fn ff_log_data(ptr: *const u8, len: u32, log_level: u32); From 712b4891df9aa0261736dc6a53b2fc84403b7e94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Tue, 12 Sep 2023 11:14:22 +0200 Subject: [PATCH 05/20] add early_exit FFI function and handle errors in load --- packages/fuel-indexer-plugin/src/wasm.rs | 22 ++++++++++++---------- packages/fuel-indexer/src/ffi.rs | 8 ++++++++ 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/packages/fuel-indexer-plugin/src/wasm.rs b/packages/fuel-indexer-plugin/src/wasm.rs index c246fab6d..a42a18b24 100644 --- a/packages/fuel-indexer-plugin/src/wasm.rs +++ b/packages/fuel-indexer-plugin/src/wasm.rs @@ -25,6 +25,7 @@ extern "C" { fn ff_log_data(ptr: *const u8, len: u32, log_level: u32); fn ff_put_object(type_id: i64, ptr: *const u8, len: u32); fn ff_put_many_to_many_record(ptr: *const u8, len: u32); + fn ff_early_exit(err_code: i32); } // TODO: more to do here, hook up to 'impl log::Log for Logger' @@ -92,16 +93,17 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { /// Loads a record given a UID. fn load(id: UID) -> Option { - Self::load_unsafe(id).unwrap() + Self::load_unsafe(id) } /// Loads a record through the FFI with the WASM runtime and checks for errors. - fn load_unsafe(id: UID) -> Result, WasmIndexerError> { + fn load_unsafe(id: UID) -> Option { unsafe { let buff = if let Ok(bytes) = bincode::serialize(&id.to_string()) { bytes } else { - return Err(WasmIndexerError::SerializationError); + ff_early_exit(WasmIndexerError::SerializationError as i32); + unreachable!(); }; let mut bufflen = (buff.len() as u32).to_le_bytes(); @@ -111,16 +113,16 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { if !ptr.is_null() { let len = u32::from_le_bytes(bufflen) as usize; let bytes = Vec::from_raw_parts(ptr, len, len); - let vec = if let Ok(v) = deserialize(&bytes) { - v - } else { - return Err(WasmIndexerError::DeserializationError); + match deserialize(&bytes) { + Ok(vec) => Some(Self::from_row(vec)), + Err(_) => { + ff_early_exit(WasmIndexerError::DeserializationError as i32); + unreachable!() + } }; - - return Ok(Some(Self::from_row(vec))); } - Ok(None) + None } } diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index ee57ad58e..5e380c155 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -291,6 +291,12 @@ fn put_many_to_many_record( Ok(()) } +/// When called from WASM it will terminate the execution and return the error +/// code. +pub fn early_exit(err_code: i32) -> Result<(), WasmIndexerError> { + Err(WasmIndexerError::from(err_code)) +} + /// Get the exports for the given store and environment. pub fn get_exports(store: &mut Store, env: &wasmer::FunctionEnv) -> Exports { let mut exports = Exports::new(); @@ -300,7 +306,9 @@ pub fn get_exports(store: &mut Store, env: &wasmer::FunctionEnv) -> Ex let f_log_data = Function::new_typed_with_env(store, env, log_data); let f_put_many_to_many_record = Function::new_typed_with_env(store, env, put_many_to_many_record); + let f_early_exit = Function::new_typed(store, early_exit); + exports.insert("ff_early_exit".to_string(), f_early_exit); exports.insert("ff_get_object".to_string(), f_get_obj); exports.insert("ff_put_object".to_string(), f_put_obj); exports.insert( From 018981f7a87a7c947ada0be046b47b5ae097741d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Tue, 12 Sep 2023 11:18:56 +0200 Subject: [PATCH 06/20] fmt --- packages/fuel-indexer/src/ffi.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index 5e380c155..7d24efe36 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -308,7 +308,7 @@ pub fn get_exports(store: &mut Store, env: &wasmer::FunctionEnv) -> Ex Function::new_typed_with_env(store, env, put_many_to_many_record); let f_early_exit = Function::new_typed(store, early_exit); - exports.insert("ff_early_exit".to_string(), f_early_exit); + exports.insert("ff_early_exit".to_string(), f_early_exit); exports.insert("ff_get_object".to_string(), f_get_obj); exports.insert("ff_put_object".to_string(), f_put_obj); exports.insert( From 969be9029ae9f7e12536819497e4adeca3c22f91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Tue, 12 Sep 2023 14:13:39 +0200 Subject: [PATCH 07/20] add wasm exit codes test and fix an expect msg --- Cargo.lock | 1 + packages/fuel-indexer-database/src/lib.rs | 2 + packages/fuel-indexer-lib/src/lib.rs | 2 +- packages/fuel-indexer-tests/Cargo.toml | 1 + packages/fuel-indexer-tests/tests/service.rs | 128 +++++++++++++++++++ packages/fuel-indexer/src/database.rs | 4 +- plugins/forc-index/src/utils.rs | 2 +- 7 files changed, 136 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9779e287a..64d3ed59b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3480,6 +3480,7 @@ dependencies = [ "fuel-indexer-types", "fuel-indexer-utils", "fuel-tx 0.35.3", + "fuel-types 0.35.3", "fuels", "fuels-macros", "futures", diff --git a/packages/fuel-indexer-database/src/lib.rs b/packages/fuel-indexer-database/src/lib.rs index 9a8cc326a..1774b888a 100644 --- a/packages/fuel-indexer-database/src/lib.rs +++ b/packages/fuel-indexer-database/src/lib.rs @@ -28,6 +28,8 @@ pub enum IndexerDatabaseError { Unknown, #[error("You don't own this indexer.")] NotYourIndexer, + #[error("No table mapping exists for TypeId({0:?})")] + TableMappingDoesNotExist(i64), } #[derive(Debug)] diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index 82b9bf12e..1c18d1ee6 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -36,7 +36,7 @@ impl ExecutionSource { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum WasmIndexerError { DeserializationError = 1, SerializationError, diff --git a/packages/fuel-indexer-tests/Cargo.toml b/packages/fuel-indexer-tests/Cargo.toml index 9a5d3e119..ed1884b48 100644 --- a/packages/fuel-indexer-tests/Cargo.toml +++ b/packages/fuel-indexer-tests/Cargo.toml @@ -38,6 +38,7 @@ fuel-indexer-postgres = { workspace = true, features = ["metrics"] } fuel-indexer-schema = { workspace = true, default-features = true } fuel-indexer-types = { workspace = true } fuel-indexer-utils = { workspace = true } +fuel-types = { workspace = true } fuel-tx = { workspace = true } fuels = { features = ["fuel-core-lib", "std"], version = "0.46" } fuels-macros = { version = "0.46", default-features = false } diff --git a/packages/fuel-indexer-tests/tests/service.rs b/packages/fuel-indexer-tests/tests/service.rs index 3dda8bdda..20c0ae13d 100644 --- a/packages/fuel-indexer-tests/tests/service.rs +++ b/packages/fuel-indexer-tests/tests/service.rs @@ -1,7 +1,10 @@ extern crate alloc; +use fuel_indexer::prelude::fuel::{BlockData, Consensus, Header}; use fuel_indexer::{Executor, IndexerConfig, WasmIndexExecutor}; +use fuel_indexer_lib::WasmIndexerError; use fuel_indexer_lib::{config::DatabaseConfig, manifest::Manifest}; use fuel_indexer_tests::fixtures::TestPostgresDb; +use fuel_types::Bytes32; use std::str::FromStr; use std::sync::{atomic::AtomicBool, Arc}; @@ -90,3 +93,128 @@ async fn test_wasm_executor_can_meter_execution() { ), } } + +#[tokio::test] +async fn test_wasm_executor_exit_codes() { + use async_std::{fs::File, io::ReadExt}; + + if let Ok(mut current_dir) = std::env::current_dir() { + if current_dir.ends_with("fuel-indexer-tests") { + current_dir.pop(); + current_dir.pop(); + } + + if let Err(e) = std::env::set_current_dir(current_dir) { + eprintln!("Failed to change directory: {}", e); + } + } + + let manifest = Manifest::from_file( + "packages/fuel-indexer-tests/indexers/fuel-indexer-test/fuel_indexer_test.yaml", + ) + .unwrap(); + + match &manifest.module() { + fuel_indexer_lib::manifest::Module::Wasm(ref module) => { + let mut bytes = Vec::::new(); + let mut file = File::open(module).await.unwrap(); + file.read_to_end(&mut bytes).await.unwrap(); + + let test_db = TestPostgresDb::new().await.unwrap(); + let pool = fuel_indexer_database::IndexerConnectionPool::Postgres( + test_db.pool.clone(), + ); + let config = IndexerConfig::default(); + + let schema_version = manifest + .graphql_schema_content() + .unwrap() + .version() + .to_string(); + + let kill_switch = Arc::new(AtomicBool::new(false)); + let mut executor = WasmIndexExecutor::new( + &config, + &manifest, + bytes.clone(), + pool, + schema_version, + kill_switch.clone(), + ) + .await + .unwrap(); + + let block_1_header = Header { + id: Bytes32::zeroed(), + da_height: 1, + transactions_count: 0, + message_receipt_count: 0, + transactions_root: Bytes32::zeroed(), + message_receipt_root: Bytes32::zeroed(), + height: 1, + prev_root: Bytes32::zeroed(), + time: 0, + application_hash: Bytes32::zeroed(), + }; + + let block_1 = BlockData { + height: 1, + id: Bytes32::zeroed(), + header: block_1_header, + producer: None, + time: 0, + consensus: Consensus::Unknown, + transactions: vec![], + }; + + let blocks: Vec = vec![block_1]; + + // Test 1 + + // Since we are only starting the executor, and not the whole Fuel + // Indexer Servoce, the database tables are not initialized, and any + // database operation performed by the executor will fail. + if let Err(e) = executor + .handle_events(kill_switch.clone(), blocks.clone()) + .await + { + if let fuel_indexer::IndexerError::RuntimeError(e) = e { + match e.downcast::() { + Ok(err_code) => { + assert_eq!(err_code, WasmIndexerError::DatabaseError); + } + Err(e) => { + panic!("Expected a WASM exit code but got: {e:?}"); + } + } + } else { + panic!("Expected a RuntimeError but got: {e:?}"); + } + } + + // Test 2 + + // Trigger kill switch. + + kill_switch.store(true, std::sync::atomic::Ordering::SeqCst); + + if let Err(e) = executor.handle_events(kill_switch, blocks.clone()).await { + if let fuel_indexer::IndexerError::RuntimeError(e) = e { + match e.downcast::() { + Ok(err_code) => { + assert_eq!(err_code, WasmIndexerError::KillSwitch); + } + Err(e) => { + panic!("Expected a WASM exit code but got: {e:?}"); + } + } + } else { + panic!("Expected a RuntimeError but got: {e:?}"); + } + } + } + _ => panic!( + "Expected a WASM module in the manifest but got a Native module instead." + ), + } +} diff --git a/packages/fuel-indexer/src/database.rs b/packages/fuel-indexer/src/database.rs index 50a92551a..c19bddd9e 100644 --- a/packages/fuel-indexer/src/database.rs +++ b/packages/fuel-indexer/src/database.rs @@ -1,5 +1,5 @@ use crate::{IndexerConfig, IndexerError, IndexerResult, Manifest}; -use fuel_indexer_database::{queries, IndexerConnection, IndexerConnectionPool}; +use fuel_indexer_database::{queries, IndexerConnection, IndexerConnectionPool, IndexerDatabaseError}; use fuel_indexer_lib::{ fully_qualified_namespace, graphql::types::IdCol, utils::format_sql_query, }; @@ -188,7 +188,7 @@ Do your WASM modules need to be rebuilt? type_id: i64, object_id: String, ) -> IndexerResult>> { - let table = &self.tables[&type_id]; + let table = &self.tables.get(&type_id).ok_or(IndexerDatabaseError::TableMappingDoesNotExist(type_id))?; let query = self.get_query(table, &object_id); let conn = self .stashed diff --git a/plugins/forc-index/src/utils.rs b/plugins/forc-index/src/utils.rs index 3d0422050..5dc57c46b 100644 --- a/plugins/forc-index/src/utils.rs +++ b/plugins/forc-index/src/utils.rs @@ -136,7 +136,7 @@ pub fn cargo_workspace_root_dir( // Use serde to extract the "target_directory" field let target_directory = metadata_json["workspace_root"] .as_str() - .expect("target_directory not found or invalid"); + .expect("workspace_root not found or invalid"); Ok(target_directory.into()) } From b17177fbe6d231cca3088d7e0abecab33585d756 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Tue, 12 Sep 2023 14:14:10 +0200 Subject: [PATCH 08/20] fmt --- packages/fuel-indexer/src/database.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/fuel-indexer/src/database.rs b/packages/fuel-indexer/src/database.rs index c19bddd9e..efde48828 100644 --- a/packages/fuel-indexer/src/database.rs +++ b/packages/fuel-indexer/src/database.rs @@ -1,5 +1,7 @@ use crate::{IndexerConfig, IndexerError, IndexerResult, Manifest}; -use fuel_indexer_database::{queries, IndexerConnection, IndexerConnectionPool, IndexerDatabaseError}; +use fuel_indexer_database::{ + queries, IndexerConnection, IndexerConnectionPool, IndexerDatabaseError, +}; use fuel_indexer_lib::{ fully_qualified_namespace, graphql::types::IdCol, utils::format_sql_query, }; @@ -188,7 +190,10 @@ Do your WASM modules need to be rebuilt? type_id: i64, object_id: String, ) -> IndexerResult>> { - let table = &self.tables.get(&type_id).ok_or(IndexerDatabaseError::TableMappingDoesNotExist(type_id))?; + let table = &self + .tables + .get(&type_id) + .ok_or(IndexerDatabaseError::TableMappingDoesNotExist(type_id))?; let query = self.get_query(table, &object_id); let conn = self .stashed From 63920fa599c08bdb0cc0d1987abf8823c7a2aa09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Tue, 12 Sep 2023 14:17:18 +0200 Subject: [PATCH 09/20] cargo sort --- packages/fuel-indexer-tests/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/fuel-indexer-tests/Cargo.toml b/packages/fuel-indexer-tests/Cargo.toml index ed1884b48..18247601c 100644 --- a/packages/fuel-indexer-tests/Cargo.toml +++ b/packages/fuel-indexer-tests/Cargo.toml @@ -38,8 +38,8 @@ fuel-indexer-postgres = { workspace = true, features = ["metrics"] } fuel-indexer-schema = { workspace = true, default-features = true } fuel-indexer-types = { workspace = true } fuel-indexer-utils = { workspace = true } -fuel-types = { workspace = true } fuel-tx = { workspace = true } +fuel-types = { workspace = true } fuels = { features = ["fuel-core-lib", "std"], version = "0.46" } fuels-macros = { version = "0.46", default-features = false } futures = "0.3" From 1dc9f5fb9b25646a3988d2312ce001c9869685e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Tue, 12 Sep 2023 07:30:39 -0700 Subject: [PATCH 10/20] Update packages/fuel-indexer-lib/src/lib.rs Co-authored-by: rashad --- packages/fuel-indexer-lib/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index 1c18d1ee6..97a4e08e2 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -93,7 +93,7 @@ impl std::fmt::Display for WasmIndexerError { Self::DatabaseError => { write!(f, "Failed performing a database operation") } - Self::GeneralError => write!(f, "A WASM error occurred"), + Self::GeneralError => write!(f, "Some unspecified WASM error occurred."), } } } From 4cff8b3ce5ae9fd84f31eee10e6efda73d4c0abd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Tue, 12 Sep 2023 07:30:53 -0700 Subject: [PATCH 11/20] Update packages/fuel-indexer-lib/src/lib.rs Co-authored-by: rashad --- packages/fuel-indexer-lib/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index 97a4e08e2..b401fc78f 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -73,7 +73,7 @@ impl std::fmt::Display for WasmIndexerError { write!(f, "Failed to serialize") } Self::DeserializationError => { - write!(f, "Failed to deserialize") + write!(f, "Failed to deserialize object.") } Self::UnableToSaveListType => { write!(f, "Failed to save list") From a6d3d8879972c6c49284cb3f075d4fb1c2a7d760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Tue, 12 Sep 2023 07:38:35 -0700 Subject: [PATCH 12/20] Update packages/fuel-indexer-lib/src/lib.rs Co-authored-by: rashad --- packages/fuel-indexer-lib/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index b401fc78f..5eb19735c 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -70,7 +70,7 @@ impl std::fmt::Display for WasmIndexerError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::SerializationError => { - write!(f, "Failed to serialize") + write!(f, "Failed to serialize object.") } Self::DeserializationError => { write!(f, "Failed to deserialize object.") From 0af2583221e10bf009504ca792cdefc0d5095218 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Tue, 12 Sep 2023 16:53:18 +0200 Subject: [PATCH 13/20] more review feedback --- packages/fuel-indexer-lib/src/lib.rs | 27 +++++++++----------- packages/fuel-indexer-tests/tests/service.rs | 2 +- packages/fuel-indexer/src/ffi.rs | 2 +- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index 5eb19735c..1a26e983a 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -36,9 +36,9 @@ impl ExecutionSource { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)] pub enum WasmIndexerError { - DeserializationError = 1, + DeserializationError = 0, SerializationError, PutObjectError, UnableToSaveListType, @@ -49,18 +49,17 @@ pub enum WasmIndexerError { GeneralError, } -impl From for WasmIndexerError { - fn from(value: i32) -> Self { +impl From for WasmIndexerError { + fn from(value: u32) -> Self { match value { - 0 => unreachable!("WasmIndexerError index starts at 1"), - 1 => Self::DeserializationError, - 2 => Self::SerializationError, - 3 => Self::PutObjectError, - 4 => Self::UnableToSaveListType, - 5 => Self::UninitializedMemory, - 6 => Self::UnableToFetchLogString, - 7 => Self::KillSwitch, - 8 => Self::DatabaseError, + 0 => Self::DeserializationError, + 1 => Self::SerializationError, + 2 => Self::PutObjectError, + 3 => Self::UnableToSaveListType, + 4 => Self::UninitializedMemory, + 5 => Self::UnableToFetchLogString, + 6 => Self::KillSwitch, + 7 => Self::DatabaseError, _ => Self::GeneralError, } } @@ -98,8 +97,6 @@ impl std::fmt::Display for WasmIndexerError { } } -impl std::error::Error for WasmIndexerError {} - /// Return a fully qualified indexer namespace. pub fn fully_qualified_namespace(namespace: &str, identifier: &str) -> String { format!("{}_{}", namespace, identifier) diff --git a/packages/fuel-indexer-tests/tests/service.rs b/packages/fuel-indexer-tests/tests/service.rs index 9d98012aa..814dba91b 100644 --- a/packages/fuel-indexer-tests/tests/service.rs +++ b/packages/fuel-indexer-tests/tests/service.rs @@ -90,7 +90,7 @@ async fn test_wasm_executor_can_meter_execution() { } #[tokio::test] -async fn test_wasm_executor_exit_codes() { +async fn test_wasm_executor_error_codes() { use async_std::{fs::File, io::ReadExt}; if let Ok(mut current_dir) = std::env::current_dir() { diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index 7d24efe36..be4b763c7 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -293,7 +293,7 @@ fn put_many_to_many_record( /// When called from WASM it will terminate the execution and return the error /// code. -pub fn early_exit(err_code: i32) -> Result<(), WasmIndexerError> { +pub fn early_exit(err_code: u32) -> Result<(), WasmIndexerError> { Err(WasmIndexerError::from(err_code)) } From a4549a8db748ff5b9ac4766ab7d5fa10f3835ab4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Wed, 13 Sep 2023 13:37:12 +0200 Subject: [PATCH 14/20] improve ealy_exit function --- packages/fuel-indexer-plugin/src/wasm.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/packages/fuel-indexer-plugin/src/wasm.rs b/packages/fuel-indexer-plugin/src/wasm.rs index a42a18b24..721bd397c 100644 --- a/packages/fuel-indexer-plugin/src/wasm.rs +++ b/packages/fuel-indexer-plugin/src/wasm.rs @@ -25,7 +25,7 @@ extern "C" { fn ff_log_data(ptr: *const u8, len: u32, log_level: u32); fn ff_put_object(type_id: i64, ptr: *const u8, len: u32); fn ff_put_many_to_many_record(ptr: *const u8, len: u32); - fn ff_early_exit(err_code: i32); + fn ff_early_exit(err_code: u32); } // TODO: more to do here, hook up to 'impl log::Log for Logger' @@ -102,8 +102,7 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { let buff = if let Ok(bytes) = bincode::serialize(&id.to_string()) { bytes } else { - ff_early_exit(WasmIndexerError::SerializationError as i32); - unreachable!(); + early_exit(WasmIndexerError::SerializationError); }; let mut bufflen = (buff.len() as u32).to_le_bytes(); @@ -116,8 +115,7 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { match deserialize(&bytes) { Ok(vec) => Some(Self::from_row(vec)), Err(_) => { - ff_early_exit(WasmIndexerError::DeserializationError as i32); - unreachable!() + early_exit(WasmIndexerError::DeserializationError); } }; } @@ -158,3 +156,10 @@ fn alloc_fn(size: u32) -> *const u8 { fn dealloc_fn(ptr: *mut u8, len: usize) { let _vec = unsafe { Vec::from_raw_parts(ptr, len, len) }; } + +#[no_mangle] +/// Immediately terminate WASM execution with the specified error code. +fn early_exit(err_code: WasmIndexerError) -> ! { + unsafe { ff_early_exit(err_code as u32) } + unreachable!("Expected termination of WASM exetution after a call to ff_early_exit.") +} From 46507e8ef0082f6d0becdff4a51bbea7cfb9a2cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Wed, 13 Sep 2023 22:51:47 +0200 Subject: [PATCH 15/20] return deserialization error if decoding blockdata fails --- packages/fuel-indexer-macros/src/wasm.rs | 3 +-- packages/fuel-indexer-plugin/src/lib.rs | 3 +++ packages/fuel-indexer-plugin/src/wasm.rs | 2 +- .../trybuild/pass_if_indexer_is_valid_multi_type.rs | 3 +++ .../trybuild/pass_if_indexer_is_valid_single_type.rs | 3 +++ .../trybuild/pass_if_unsupported_types_are_used.rs | 3 +++ 6 files changed, 14 insertions(+), 3 deletions(-) diff --git a/packages/fuel-indexer-macros/src/wasm.rs b/packages/fuel-indexer-macros/src/wasm.rs index 8d3a0ed7b..95838c13b 100644 --- a/packages/fuel-indexer-macros/src/wasm.rs +++ b/packages/fuel-indexer-macros/src/wasm.rs @@ -17,10 +17,9 @@ pub fn handler_block_wasm( let blocks: Vec = match deserialize(&bytes) { Ok(blocks) => blocks, Err(msg) => { - // TODO: probably need some error codes to send back to runtime. core::mem::forget(bytes); Logger::error(&msg); - return; + early_exit(WasmIndexerError::DeserializationError) } }; core::mem::forget(bytes); diff --git a/packages/fuel-indexer-plugin/src/lib.rs b/packages/fuel-indexer-plugin/src/lib.rs index 07127fa3a..35650c192 100644 --- a/packages/fuel-indexer-plugin/src/lib.rs +++ b/packages/fuel-indexer-plugin/src/lib.rs @@ -10,6 +10,9 @@ pub mod types { pub use fuel_indexer_types::fuel::{BlockData, TxId}; pub use fuel_indexer_types::scalar::UID; + // For use with `early_exit` function to terminate execution on error. + pub use fuel_indexer_lib::WasmIndexerError; + // Traits needed to access client type fields. Could also include this as a sub-module // of `fuel_indexer_types::fuel`. pub use fuel_indexer_types::fuel::field::*; diff --git a/packages/fuel-indexer-plugin/src/wasm.rs b/packages/fuel-indexer-plugin/src/wasm.rs index 721bd397c..e8af8578e 100644 --- a/packages/fuel-indexer-plugin/src/wasm.rs +++ b/packages/fuel-indexer-plugin/src/wasm.rs @@ -159,7 +159,7 @@ fn dealloc_fn(ptr: *mut u8, len: usize) { #[no_mangle] /// Immediately terminate WASM execution with the specified error code. -fn early_exit(err_code: WasmIndexerError) -> ! { +pub fn early_exit(err_code: WasmIndexerError) -> ! { unsafe { ff_early_exit(err_code as u32) } unreachable!("Expected termination of WASM exetution after a call to ff_early_exit.") } diff --git a/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_multi_type.rs b/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_multi_type.rs index 6fd0a8cb5..7a6196747 100644 --- a/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_multi_type.rs +++ b/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_multi_type.rs @@ -10,6 +10,9 @@ fn ff_put_object(_inp: ()) {} #[no_mangle] fn ff_put_many_to_many_record(_inp: ()) {} +#[no_mangle] +fn ff_early_exit(_inp: ()) {} + #[indexer(manifest = "packages/fuel-indexer-tests/trybuild/simple_wasm.yaml")] mod indexer { fn function_one(event: SomeEvent) { diff --git a/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_single_type.rs b/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_single_type.rs index dc6cd2e88..c097f1e24 100644 --- a/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_single_type.rs +++ b/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_single_type.rs @@ -10,6 +10,9 @@ fn ff_put_object(_inp: ()) {} #[no_mangle] fn ff_put_many_to_many_record(_inp: ()) {} +#[no_mangle] +fn ff_early_exit(_inp: ()) {} + #[indexer(manifest = "packages/fuel-indexer-tests/trybuild/simple_wasm.yaml")] mod indexer { fn function_one(event: SomeEvent) { diff --git a/packages/fuel-indexer-tests/trybuild/pass_if_unsupported_types_are_used.rs b/packages/fuel-indexer-tests/trybuild/pass_if_unsupported_types_are_used.rs index 09524cbd5..bcc01f01b 100644 --- a/packages/fuel-indexer-tests/trybuild/pass_if_unsupported_types_are_used.rs +++ b/packages/fuel-indexer-tests/trybuild/pass_if_unsupported_types_are_used.rs @@ -10,6 +10,9 @@ fn ff_put_object(_inp: ()) {} #[no_mangle] fn ff_put_many_to_many_record(_inp: ()) {} +#[no_mangle] +fn ff_early_exit(_inp: ()) {} + #[indexer(manifest = "packages/fuel-indexer-tests/trybuild/simple_wasm.yaml")] mod indexer { fn function_one(event: SomeEvent) { From c295242011fc0e8fc4de6526c9c5a2e5783fc3c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Wed, 13 Sep 2023 22:52:27 +0200 Subject: [PATCH 16/20] simpler way to downcast --- packages/fuel-indexer/src/executor.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 24d3d9c88..838c518f2 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -25,7 +25,6 @@ use fuel_vm::state::ProgramState as ClientProgramState; use futures::Future; use itertools::Itertools; use std::{ - error::Error, marker::{Send, Sync}, path::Path, str::FromStr, @@ -869,10 +868,7 @@ impl Executor for WasmIndexExecutor { self.db.lock().await.revert_transaction().await?; return Err(IndexerError::RunTimeLimitExceededError); } else { - if let Some(e) = e - .source() - .and_then(|e| e.downcast_ref::()) - { + if let Some(e) = e.downcast_ref::() { match e { // Termination due to kill switch is an expected behavior. WasmIndexerError::KillSwitch => { From 18f5bd301293b3e0e8b465d2c63b75f3d76303a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Fri, 15 Sep 2023 07:33:36 -0700 Subject: [PATCH 17/20] Update packages/fuel-indexer-lib/src/lib.rs Co-authored-by: rashad --- packages/fuel-indexer-lib/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index 1a26e983a..1fd873c2e 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -87,7 +87,7 @@ impl std::fmt::Display for WasmIndexerError { write!(f, "Failed to fetch log string") } Self::KillSwitch => { - write!(f, "Kill switch has been triggered") + write!(f, "Indexer kill switch has been triggered. Indexer will halt.") } Self::DatabaseError => { write!(f, "Failed performing a database operation") From a00ca22ac1c4143c7e7ce6b2e0cec38cd9476d86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Fri, 15 Sep 2023 16:33:53 +0200 Subject: [PATCH 18/20] remove unused variant --- packages/fuel-indexer-lib/src/lib.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index 1fd873c2e..3c0e99fcc 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -43,7 +43,6 @@ pub enum WasmIndexerError { PutObjectError, UnableToSaveListType, UninitializedMemory, - UnableToFetchLogString, KillSwitch, DatabaseError, GeneralError, @@ -57,7 +56,6 @@ impl From for WasmIndexerError { 2 => Self::PutObjectError, 3 => Self::UnableToSaveListType, 4 => Self::UninitializedMemory, - 5 => Self::UnableToFetchLogString, 6 => Self::KillSwitch, 7 => Self::DatabaseError, _ => Self::GeneralError, @@ -83,9 +81,6 @@ impl std::fmt::Display for WasmIndexerError { Self::UninitializedMemory => { write!(f, "Failed to create MemoryView for indexer") } - Self::UnableToFetchLogString => { - write!(f, "Failed to fetch log string") - } Self::KillSwitch => { write!(f, "Indexer kill switch has been triggered. Indexer will halt.") } From 0de86322f3d32d720a1cea7342e37d3cc8d6b7db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Fri, 15 Sep 2023 07:35:12 -0700 Subject: [PATCH 19/20] Update packages/fuel-indexer-lib/src/lib.rs Co-authored-by: rashad --- packages/fuel-indexer-lib/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index 3c0e99fcc..d3cba612f 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -85,7 +85,7 @@ impl std::fmt::Display for WasmIndexerError { write!(f, "Indexer kill switch has been triggered. Indexer will halt.") } Self::DatabaseError => { - write!(f, "Failed performing a database operation") + write!(f, "Database operation failed.") } Self::GeneralError => write!(f, "Some unspecified WASM error occurred."), } From 0baa5a43c342ce531b813aca86cb77c91039e9e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Fri, 15 Sep 2023 19:27:17 +0200 Subject: [PATCH 20/20] fmt --- packages/fuel-indexer-lib/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index d3cba612f..3eacc8d82 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -82,7 +82,10 @@ impl std::fmt::Display for WasmIndexerError { write!(f, "Failed to create MemoryView for indexer") } Self::KillSwitch => { - write!(f, "Indexer kill switch has been triggered. Indexer will halt.") + write!( + f, + "Indexer kill switch has been triggered. Indexer will halt." + ) } Self::DatabaseError => { write!(f, "Database operation failed.")