diff --git a/Cargo.lock b/Cargo.lock index 19512c580..ca52efff0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3479,6 +3479,7 @@ dependencies = [ "fuel-indexer-types", "fuel-indexer-utils", "fuel-tx 0.35.3", + "fuel-types 0.35.3", "fuels", "fuels-macros", "futures", diff --git a/packages/fuel-indexer-api-server/src/uses.rs b/packages/fuel-indexer-api-server/src/uses.rs index d1b25e79b..ec46aaec7 100644 --- a/packages/fuel-indexer-api-server/src/uses.rs +++ b/packages/fuel-indexer-api-server/src/uses.rs @@ -180,6 +180,12 @@ pub(crate) async fn remove_indexer( })) .await?; + // We have early termination on a kill switch. Yet, it is still possible for + // the database entries to be removed before the indexer has time to act on + // kill switch being triggered. Adding a small delay here should prevent + // unnecessary DatabaseError's appearing in the logs. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // Allways remove data when removing an indexer. if let Err(e) = queries::remove_indexer(&mut conn, &namespace, &identifier, true).await diff --git a/packages/fuel-indexer-database/src/lib.rs b/packages/fuel-indexer-database/src/lib.rs index 9a8cc326a..1774b888a 100644 --- a/packages/fuel-indexer-database/src/lib.rs +++ b/packages/fuel-indexer-database/src/lib.rs @@ -28,6 +28,8 @@ pub enum IndexerDatabaseError { Unknown, #[error("You don't own this indexer.")] NotYourIndexer, + #[error("No table mapping exists for TypeId({0:?})")] + TableMappingDoesNotExist(i64), } #[derive(Debug)] diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index cbc7ec340..3eacc8d82 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -36,6 +36,65 @@ impl ExecutionSource { } } +#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)] +pub enum WasmIndexerError { + DeserializationError = 0, + SerializationError, + PutObjectError, + UnableToSaveListType, + UninitializedMemory, + KillSwitch, + DatabaseError, + GeneralError, +} + +impl From for WasmIndexerError { + fn from(value: u32) -> Self { + match value { + 0 => Self::DeserializationError, + 1 => Self::SerializationError, + 2 => Self::PutObjectError, + 3 => Self::UnableToSaveListType, + 4 => Self::UninitializedMemory, + 6 => Self::KillSwitch, + 7 => Self::DatabaseError, + _ => Self::GeneralError, + } + } +} + +impl std::fmt::Display for WasmIndexerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::SerializationError => { + write!(f, "Failed to serialize object.") + } + Self::DeserializationError => { + write!(f, "Failed to deserialize object.") + } + Self::UnableToSaveListType => { + write!(f, "Failed to save list") + } + Self::PutObjectError => { + write!(f, "Failed to save object") + } + Self::UninitializedMemory => { + write!(f, "Failed to create MemoryView for indexer") + } + Self::KillSwitch => { + write!( + f, + "Indexer kill switch has been triggered. Indexer will halt." + ) + } + Self::DatabaseError => { + write!(f, "Database operation failed.") + } + Self::GeneralError => write!(f, "Some unspecified WASM error occurred."), + } + } +} + /// Return a fully qualified indexer namespace. pub fn fully_qualified_namespace(namespace: &str, identifier: &str) -> String { format!("{}_{}", namespace, identifier) diff --git a/packages/fuel-indexer-macros/src/decoder.rs b/packages/fuel-indexer-macros/src/decoder.rs index f360976be..d875e3f80 100644 --- a/packages/fuel-indexer-macros/src/decoder.rs +++ b/packages/fuel-indexer-macros/src/decoder.rs @@ -833,7 +833,7 @@ impl From for TokenStream { .map(|query| query.to_string()) .collect::>(); - d.lock().await.put_many_to_many_record(queries).await; + d.lock().await.put_many_to_many_record(queries).await.expect(&format!("Entity::save_many_to_many for {} failed.", stringify!(#ident))); } } None => {} @@ -846,12 +846,15 @@ impl From for TokenStream { match &db { Some(d) => { match d.lock().await.get_object(Self::TYPE_ID, id.to_string()).await { - Some(bytes) => { + Ok(Some(bytes)) => { let columns: Vec = bincode::deserialize(&bytes).expect("Failed to deserialize Vec for Entity::load."); let obj = Self::from_row(columns); Some(obj) }, - None => None, + Ok(None) => None, + Err(e) => { + panic!("Entity::load for {} failed.", stringify!(#ident)) + } } } None => None, @@ -868,7 +871,7 @@ impl From for TokenStream { Self::TYPE_ID, self.to_row(), serialize(&self.to_row()) - ).await; + ).await.expect(&format!("Entity::save for {} failed.", stringify!(#ident))); } None => {}, } diff --git a/packages/fuel-indexer-macros/src/wasm.rs b/packages/fuel-indexer-macros/src/wasm.rs index 8d3a0ed7b..95838c13b 100644 --- a/packages/fuel-indexer-macros/src/wasm.rs +++ b/packages/fuel-indexer-macros/src/wasm.rs @@ -17,10 +17,9 @@ pub fn handler_block_wasm( let blocks: Vec = match deserialize(&bytes) { Ok(blocks) => blocks, Err(msg) => { - // TODO: probably need some error codes to send back to runtime. core::mem::forget(bytes); Logger::error(&msg); - return; + early_exit(WasmIndexerError::DeserializationError) } }; core::mem::forget(bytes); diff --git a/packages/fuel-indexer-plugin/src/lib.rs b/packages/fuel-indexer-plugin/src/lib.rs index 07127fa3a..35650c192 100644 --- a/packages/fuel-indexer-plugin/src/lib.rs +++ b/packages/fuel-indexer-plugin/src/lib.rs @@ -10,6 +10,9 @@ pub mod types { pub use fuel_indexer_types::fuel::{BlockData, TxId}; pub use fuel_indexer_types::scalar::UID; + // For use with `early_exit` function to terminate execution on error. + pub use fuel_indexer_lib::WasmIndexerError; + // Traits needed to access client type fields. Could also include this as a sub-module // of `fuel_indexer_types::fuel`. pub use fuel_indexer_types::fuel::field::*; diff --git a/packages/fuel-indexer-plugin/src/wasm.rs b/packages/fuel-indexer-plugin/src/wasm.rs index 38c409a1f..e8af8578e 100644 --- a/packages/fuel-indexer-plugin/src/wasm.rs +++ b/packages/fuel-indexer-plugin/src/wasm.rs @@ -4,6 +4,7 @@ use alloc::vec::Vec; use fuel_indexer_lib::{ graphql::MAX_FOREIGN_KEY_LIST_FIELDS, utils::{deserialize, serialize}, + WasmIndexerError, }; use fuel_indexer_schema::{ join::{JoinMetadata, RawQuery}, @@ -16,12 +17,15 @@ pub use hex::FromHex; pub use sha2::{Digest, Sha256}; pub use std::collections::{HashMap, HashSet}; +// These are instantiated with functions which return +// `Result`. `wasmer` unwraps the `Result` and uses the +// `Err` variant for ealy exit. extern "C" { - // TODO: error codes? or just panic and let the runtime handle it? fn ff_get_object(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8; + fn ff_log_data(ptr: *const u8, len: u32, log_level: u32); fn ff_put_object(type_id: i64, ptr: *const u8, len: u32); fn ff_put_many_to_many_record(ptr: *const u8, len: u32); - fn ff_log_data(ptr: *const u8, len: u32, log_level: u32); + fn ff_early_exit(err_code: u32); } // TODO: more to do here, hook up to 'impl log::Log for Logger' @@ -49,18 +53,28 @@ impl Logger { } } +/// Trait for a type entity. +/// +/// Any entity type that will be processed through a WASM indexer is required to implement this trait. pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { + /// Unique identifier for a type. const TYPE_ID: i64; + + /// Necessary metadata for saving an entity's list type fields. const JOIN_METADATA: Option<[Option>; MAX_FOREIGN_KEY_LIST_FIELDS]>; + /// Convert database row representation into an instance of an entity. fn from_row(vec: Vec) -> Self; + /// Convert an instance of an entity into row representation for use in a database. fn to_row(&self) -> Vec; + /// Returns an entity's internal type ID. fn type_id(&self) -> i64 { Self::TYPE_ID } + /// Saves a record that contains a list of multiple elements. fn save_many_to_many(&self) { if let Some(meta) = Self::JOIN_METADATA { let items = meta.iter().filter_map(|x| x.clone()).collect::>(); @@ -71,13 +85,26 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { .filter(|query| !query.is_empty()) .collect::>(); let bytes = serialize(&queries); - unsafe { ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32) } + unsafe { + ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32); + } } } + /// Loads a record given a UID. fn load(id: UID) -> Option { + Self::load_unsafe(id) + } + + /// Loads a record through the FFI with the WASM runtime and checks for errors. + fn load_unsafe(id: UID) -> Option { unsafe { - let buff = bincode::serialize(&id.to_string()).unwrap(); + let buff = if let Ok(bytes) = bincode::serialize(&id.to_string()) { + bytes + } else { + early_exit(WasmIndexerError::SerializationError); + }; + let mut bufflen = (buff.len() as u32).to_le_bytes(); let ptr = ff_get_object(Self::TYPE_ID, buff.as_ptr(), bufflen.as_mut_ptr()); @@ -85,26 +112,36 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { if !ptr.is_null() { let len = u32::from_le_bytes(bufflen) as usize; let bytes = Vec::from_raw_parts(ptr, len, len); - let vec = deserialize(&bytes).expect("Bad serialization."); - - return Some(Self::from_row(vec)); + match deserialize(&bytes) { + Ok(vec) => Some(Self::from_row(vec)), + Err(_) => { + early_exit(WasmIndexerError::DeserializationError); + } + }; } None } } + /// Saves a record. fn save(&self) { + self.save_unsafe() + } + + /// Saves a record through the FFI with the WASM runtime and checks for errors. + fn save_unsafe(&self) { unsafe { let buf = serialize(&self.to_row()); - ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32) + ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32); } - self.save_many_to_many(); + self.save_many_to_many() } } #[no_mangle] +/// Allocation function to be called by an executor in a WASM runtime. fn alloc_fn(size: u32) -> *const u8 { let vec = Vec::with_capacity(size as usize); let ptr = vec.as_ptr(); @@ -115,6 +152,14 @@ fn alloc_fn(size: u32) -> *const u8 { } #[no_mangle] +/// Deallocation function to be called by an executor in a WASM runtime. fn dealloc_fn(ptr: *mut u8, len: usize) { let _vec = unsafe { Vec::from_raw_parts(ptr, len, len) }; } + +#[no_mangle] +/// Immediately terminate WASM execution with the specified error code. +pub fn early_exit(err_code: WasmIndexerError) -> ! { + unsafe { ff_early_exit(err_code as u32) } + unreachable!("Expected termination of WASM exetution after a call to ff_early_exit.") +} diff --git a/packages/fuel-indexer-tests/Cargo.toml b/packages/fuel-indexer-tests/Cargo.toml index 9a5d3e119..18247601c 100644 --- a/packages/fuel-indexer-tests/Cargo.toml +++ b/packages/fuel-indexer-tests/Cargo.toml @@ -39,6 +39,7 @@ fuel-indexer-schema = { workspace = true, default-features = true } fuel-indexer-types = { workspace = true } fuel-indexer-utils = { workspace = true } fuel-tx = { workspace = true } +fuel-types = { workspace = true } fuels = { features = ["fuel-core-lib", "std"], version = "0.46" } fuels-macros = { version = "0.46", default-features = false } futures = "0.3" diff --git a/packages/fuel-indexer-tests/tests/service.rs b/packages/fuel-indexer-tests/tests/service.rs index 28c29add1..814dba91b 100644 --- a/packages/fuel-indexer-tests/tests/service.rs +++ b/packages/fuel-indexer-tests/tests/service.rs @@ -1,7 +1,10 @@ extern crate alloc; +use fuel_indexer::prelude::fuel::{BlockData, Consensus, Header}; use fuel_indexer::{Executor, IndexerConfig, WasmIndexExecutor}; +use fuel_indexer_lib::WasmIndexerError; use fuel_indexer_lib::{config::DatabaseConfig, manifest::Manifest}; use fuel_indexer_tests::fixtures::TestPostgresDb; +use fuel_types::Bytes32; use std::str::FromStr; #[tokio::test] @@ -85,3 +88,125 @@ async fn test_wasm_executor_can_meter_execution() { ), } } + +#[tokio::test] +async fn test_wasm_executor_error_codes() { + use async_std::{fs::File, io::ReadExt}; + + if let Ok(mut current_dir) = std::env::current_dir() { + if current_dir.ends_with("fuel-indexer-tests") { + current_dir.pop(); + current_dir.pop(); + } + + if let Err(e) = std::env::set_current_dir(current_dir) { + eprintln!("Failed to change directory: {}", e); + } + } + + let manifest = Manifest::from_file( + "packages/fuel-indexer-tests/indexers/fuel-indexer-test/fuel_indexer_test.yaml", + ) + .unwrap(); + + match &manifest.module() { + fuel_indexer_lib::manifest::Module::Wasm(ref module) => { + let mut bytes = Vec::::new(); + let mut file = File::open(module).await.unwrap(); + file.read_to_end(&mut bytes).await.unwrap(); + + let test_db = TestPostgresDb::new().await.unwrap(); + let pool = fuel_indexer_database::IndexerConnectionPool::Postgres( + test_db.pool.clone(), + ); + let config = IndexerConfig::default(); + + let schema_version = manifest + .graphql_schema_content() + .unwrap() + .version() + .to_string(); + + let mut executor = WasmIndexExecutor::new( + &config, + &manifest, + bytes.clone(), + pool, + schema_version, + ) + .await + .unwrap(); + + let block_1_header = Header { + id: Bytes32::zeroed(), + da_height: 1, + transactions_count: 0, + message_receipt_count: 0, + transactions_root: Bytes32::zeroed(), + message_receipt_root: Bytes32::zeroed(), + height: 1, + prev_root: Bytes32::zeroed(), + time: 0, + application_hash: Bytes32::zeroed(), + }; + + let block_1 = BlockData { + height: 1, + id: Bytes32::zeroed(), + header: block_1_header, + producer: None, + time: 0, + consensus: Consensus::Unknown, + transactions: vec![], + }; + + let blocks: Vec = vec![block_1]; + + // Test 1 + + // Since we are only starting the executor, and not the whole Fuel + // Indexer Servoce, the database tables are not initialized, and any + // database operation performed by the executor will fail. + if let Err(e) = executor.handle_events(blocks.clone()).await { + if let fuel_indexer::IndexerError::RuntimeError(e) = e { + match e.downcast::() { + Ok(err_code) => { + assert_eq!(err_code, WasmIndexerError::DatabaseError); + } + Err(e) => { + panic!("Expected a WASM exit code but got: {e:?}"); + } + } + } else { + panic!("Expected a RuntimeError but got: {e:?}"); + } + } + + // Test 2 + + // Trigger kill switch. + + executor + .kill_switch() + .store(true, std::sync::atomic::Ordering::SeqCst); + + if let Err(e) = executor.handle_events(blocks.clone()).await { + if let fuel_indexer::IndexerError::RuntimeError(e) = e { + match e.downcast::() { + Ok(err_code) => { + assert_eq!(err_code, WasmIndexerError::KillSwitch); + } + Err(e) => { + panic!("Expected a WASM exit code but got: {e:?}"); + } + } + } else { + panic!("Expected a RuntimeError but got: {e:?}"); + } + } + } + _ => panic!( + "Expected a WASM module in the manifest but got a Native module instead." + ), + } +} diff --git a/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_multi_type.rs b/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_multi_type.rs index 6fd0a8cb5..7a6196747 100644 --- a/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_multi_type.rs +++ b/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_multi_type.rs @@ -10,6 +10,9 @@ fn ff_put_object(_inp: ()) {} #[no_mangle] fn ff_put_many_to_many_record(_inp: ()) {} +#[no_mangle] +fn ff_early_exit(_inp: ()) {} + #[indexer(manifest = "packages/fuel-indexer-tests/trybuild/simple_wasm.yaml")] mod indexer { fn function_one(event: SomeEvent) { diff --git a/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_single_type.rs b/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_single_type.rs index dc6cd2e88..c097f1e24 100644 --- a/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_single_type.rs +++ b/packages/fuel-indexer-tests/trybuild/pass_if_indexer_is_valid_single_type.rs @@ -10,6 +10,9 @@ fn ff_put_object(_inp: ()) {} #[no_mangle] fn ff_put_many_to_many_record(_inp: ()) {} +#[no_mangle] +fn ff_early_exit(_inp: ()) {} + #[indexer(manifest = "packages/fuel-indexer-tests/trybuild/simple_wasm.yaml")] mod indexer { fn function_one(event: SomeEvent) { diff --git a/packages/fuel-indexer-tests/trybuild/pass_if_unsupported_types_are_used.rs b/packages/fuel-indexer-tests/trybuild/pass_if_unsupported_types_are_used.rs index 09524cbd5..bcc01f01b 100644 --- a/packages/fuel-indexer-tests/trybuild/pass_if_unsupported_types_are_used.rs +++ b/packages/fuel-indexer-tests/trybuild/pass_if_unsupported_types_are_used.rs @@ -10,6 +10,9 @@ fn ff_put_object(_inp: ()) {} #[no_mangle] fn ff_put_many_to_many_record(_inp: ()) {} +#[no_mangle] +fn ff_early_exit(_inp: ()) {} + #[indexer(manifest = "packages/fuel-indexer-tests/trybuild/simple_wasm.yaml")] mod indexer { fn function_one(event: SomeEvent) { diff --git a/packages/fuel-indexer/src/database.rs b/packages/fuel-indexer/src/database.rs index 242d0deed..efde48828 100644 --- a/packages/fuel-indexer/src/database.rs +++ b/packages/fuel-indexer/src/database.rs @@ -1,5 +1,7 @@ -use crate::{IndexerConfig, IndexerResult, Manifest}; -use fuel_indexer_database::{queries, IndexerConnection, IndexerConnectionPool}; +use crate::{IndexerConfig, IndexerError, IndexerResult, Manifest}; +use fuel_indexer_database::{ + queries, IndexerConnection, IndexerConnectionPool, IndexerDatabaseError, +}; use fuel_indexer_lib::{ fully_qualified_namespace, graphql::types::IdCol, utils::format_sql_query, }; @@ -64,29 +66,36 @@ impl Database { let conn = self.pool.acquire().await?; self.stashed = Some(conn); debug!("Connection stashed as: {:?}", self.stashed); - let conn = self.stashed.as_mut().expect( - "No stashed connection for start transaction. Was a transaction started?", - ); + let conn = + self.stashed + .as_mut() + .ok_or(crate::IndexerError::NoTransactionError( + "start_transaction".to_string(), + ))?; let result = queries::start_transaction(conn).await?; Ok(result) } /// Commit transaction to database. pub async fn commit_transaction(&mut self) -> IndexerResult { - let conn = self - .stashed - .as_mut() - .expect("No stashed connection for commit. Was a transaction started?"); + let conn = + self.stashed + .as_mut() + .ok_or(crate::IndexerError::NoTransactionError( + "commit_transaction".to_string(), + ))?; let res = queries::commit_transaction(conn).await?; Ok(res) } /// Revert open transaction. pub async fn revert_transaction(&mut self) -> IndexerResult { - let conn = self - .stashed - .as_mut() - .expect("No stashed connection for revert. Was a transaction started?"); + let conn = + self.stashed + .as_mut() + .ok_or(crate::IndexerError::NoTransactionError( + "revert_transaction".to_string(), + ))?; let res = queries::revert_transaction(conn).await?; Ok(res) } @@ -132,11 +141,11 @@ impl Database { type_id: i64, columns: Vec, bytes: Vec, - ) { + ) -> IndexerResult<()> { let table = match self.tables.get(&type_id) { Some(t) => t, None => { - error!( + return Err(IndexerError::Unknown(format!( r#"TypeId({type_id}) not found in tables: {:?}. Does the schema version in SchemaManager::new_schema match the schema version in Database::load_schema? @@ -145,8 +154,7 @@ Do your WASM modules need to be rebuilt? "#, self.tables, - ); - return; + ))); } }; @@ -165,15 +173,15 @@ Do your WASM modules need to be rebuilt? let conn = self .stashed .as_mut() - .expect("No stashed connection for put. Was a transaction started?"); + .ok_or(IndexerError::NoTransactionError("put_object".to_string()))?; if self.config.verbose { info!("{query_text}"); } - if let Err(e) = queries::put_object(conn, query_text, bytes).await { - error!("Failed to put_object: {e:?}"); - } + queries::put_object(conn, query_text, bytes).await?; + + Ok(()) } /// Get an object from the database. @@ -181,23 +189,25 @@ Do your WASM modules need to be rebuilt? &mut self, type_id: i64, object_id: String, - ) -> Option> { - let table = &self.tables[&type_id]; + ) -> IndexerResult>> { + let table = &self + .tables + .get(&type_id) + .ok_or(IndexerDatabaseError::TableMappingDoesNotExist(type_id))?; let query = self.get_query(table, &object_id); let conn = self .stashed .as_mut() - .expect("No stashed connection for get. Was a transaction started?"); - + .ok_or(IndexerError::NoTransactionError("get_object".to_string()))?; match queries::get_object(conn, query).await { - Ok(v) => Some(v), + Ok(v) => Ok(Some(v)), Err(e) => { if let sqlx::Error::RowNotFound = e { debug!("Row not found for object ID: {object_id}"); } else { error!("Failed to get_object: {e:?}"); } - None + Ok(None) } } } @@ -261,20 +271,25 @@ Do your WASM modules need to be rebuilt? /// /// There are multiple queries here because a single parent `TypeDefinition` can have several /// many-to-many relationships with children `TypeDefinition`s. - pub async fn put_many_to_many_record(&mut self, queries: Vec) { + pub async fn put_many_to_many_record( + &mut self, + queries: Vec, + ) -> IndexerResult<()> { let conn = self .stashed .as_mut() - .expect("No stashed connection for put. Was a transaction started?"); + .ok_or(IndexerError::NoTransactionError( + "put_many_to_many_record".to_string(), + ))?; for query in queries { if self.config.verbose { info!("{query}"); } - if let Err(e) = queries::put_many_to_many_record(conn, query).await { - error!("Failed to put_many_to_many_record: {e:?}"); - } + queries::put_many_to_many_record(conn, query).await?; } + + Ok(()) } } diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 975fb5ac1..ee97c84a1 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -12,7 +12,9 @@ use fuel_core_client::client::{ FuelClient, }; use fuel_indexer_database::IndexerConnectionPool; -use fuel_indexer_lib::{defaults::*, manifest::Manifest, utils::serialize}; +use fuel_indexer_lib::{ + defaults::*, manifest::Manifest, utils::serialize, WasmIndexerError, +}; use fuel_indexer_types::{ fuel::{field::*, *}, scalar::{Bytes32, HexString}, @@ -188,6 +190,12 @@ pub fn run_executor( // The client responded with actual blocks, so attempt to index them. let result = executor.handle_events(block_info).await; + // If the kill switch has been triggered, the executor exits early. + if executor.kill_switch().load(Ordering::SeqCst) { + info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); + break; + } + if let Err(e) = result { // Run time metering is deterministic. There is no point in retrying. if let IndexerError::RunTimeLimitExceededError = e { @@ -511,6 +519,10 @@ pub struct IndexEnv { /// Reference to the connected database. pub db: Arc>, + + /// Kill switch for this indexer. When true, the indexer service indicated + /// that the indexer is being terminated. + pub kill_switch: Arc, } impl IndexEnv { @@ -519,6 +531,7 @@ impl IndexEnv { pool: IndexerConnectionPool, manifest: &Manifest, config: &IndexerConfig, + kill_switch: Arc, ) -> IndexerResult { let db = Database::new(pool, manifest, config).await; Ok(IndexEnv { @@ -526,6 +539,7 @@ impl IndexEnv { alloc: None, dealloc: None, db: Arc::new(Mutex::new(db)), + kill_switch, }) } } @@ -673,7 +687,9 @@ impl WasmIndexExecutor { compiler_config.push_middleware(metering); } - let idx_env = IndexEnv::new(pool, manifest, config).await?; + let kill_switch = Arc::new(AtomicBool::new(false)); + + let idx_env = IndexEnv::new(pool, manifest, config, kill_switch.clone()).await?; let db: Arc> = idx_env.db.clone(); @@ -727,8 +743,6 @@ impl WasmIndexExecutor { db.lock().await.load_schema(schema_version).await?; - let kill_switch = Arc::new(AtomicBool::new(false)); - Ok(WasmIndexExecutor { instance, _module: module, @@ -871,7 +885,19 @@ impl Executor for WasmIndexExecutor { self.db.lock().await.revert_transaction().await?; return Err(IndexerError::RunTimeLimitExceededError); } else { - error!("WasmIndexExecutor({uid}) WASM execution failed: {e:?}."); + if let Some(e) = e.downcast_ref::() { + match e { + // Termination due to kill switch is an expected behavior. + WasmIndexerError::KillSwitch => { + info!("Indexer({uid}) WASM execution terminated: {e}.") + } + _ => { + error!("Indexer({uid}) WASM execution failed: {e}.") + } + } + } else { + error!("Indexer({uid}) WASM execution failed: {e:?}."); + }; self.db.lock().await.revert_transaction().await?; return Err(IndexerError::from(e)); } diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index 0beecd840..be4b763c7 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -1,5 +1,5 @@ use async_std::sync::MutexGuard; -use fuel_indexer_lib::defaults; +use fuel_indexer_lib::{defaults, WasmIndexerError}; use fuel_indexer_schema::{join::RawQuery, FtColumn}; use fuel_indexer_types::ffi::{ LOG_LEVEL_DEBUG, LOG_LEVEL_ERROR, LOG_LEVEL_INFO, LOG_LEVEL_TRACE, LOG_LEVEL_WARN, @@ -70,8 +70,23 @@ fn get_object_id(mem: &MemoryView, ptr: u32, len: u32) -> FFIResult { } /// Log the string at the given pointer to stdout. -fn log_data(mut env: FunctionEnvMut, ptr: u32, len: u32, log_level: u32) { +fn log_data( + mut env: FunctionEnvMut, + ptr: u32, + len: u32, + log_level: u32, +) -> Result<(), WasmIndexerError> { let (idx_env, store) = env.data_and_store_mut(); + + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + let mem = idx_env .memory .as_mut() @@ -89,6 +104,8 @@ fn log_data(mut env: FunctionEnvMut, ptr: u32, len: u32, log_level: u3 LOG_LEVEL_TRACE => trace!("{log_string}",), l => panic!("Invalid log level: {l}"), } + + Ok(()) } /// Fetch the given type at the given pointer from memory. @@ -99,9 +116,18 @@ fn get_object( type_id: i64, ptr: u32, len_ptr: u32, -) -> u32 { +) -> Result { let (idx_env, mut store) = env.data_and_store_mut(); + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + let mem = idx_env .memory .as_mut() @@ -114,8 +140,12 @@ fn get_object( let id = get_object_id(&mem, ptr + offset, len + padding + offset).unwrap(); let rt = tokio::runtime::Handle::current(); - let bytes = - rt.block_on(async { idx_env.db.lock().await.get_object(type_id, id).await }); + let bytes = rt + .block_on(async { idx_env.db.lock().await.get_object(type_id, id).await }) + .map_err(|e| { + error!("Failed to get_object: {e}"); + WasmIndexerError::DatabaseError + })?; if let Some(bytes) = bytes { let alloc_fn = idx_env.alloc.as_mut().expect("Alloc export is missing."); @@ -138,22 +168,37 @@ fn get_object( mem.data_unchecked_mut()[range].copy_from_slice(&bytes); } - result + Ok(result) } else { - 0 + Ok(0) } } /// Put the given type at the given pointer into memory. /// /// This function is fallible, and will panic if the type cannot be saved. -fn put_object(mut env: FunctionEnvMut, type_id: i64, ptr: u32, len: u32) { +fn put_object( + mut env: FunctionEnvMut, + type_id: i64, + ptr: u32, + len: u32, +) -> Result<(), WasmIndexerError> { let (idx_env, store) = env.data_and_store_mut(); - let mem = idx_env - .memory - .as_mut() - .expect("Memory unitialized") - .view(&store); + + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + + let mem = if let Some(memory) = idx_env.memory.as_mut() { + memory.view(&store) + } else { + return Err(WasmIndexerError::UninitializedMemory); + }; let mut bytes = Vec::with_capacity(len as usize); let range = ptr as usize..ptr as usize + len as usize; @@ -166,12 +211,12 @@ fn put_object(mut env: FunctionEnvMut, type_id: i64, ptr: u32, len: u3 Ok(columns) => columns, Err(e) => { error!("Failed to deserialize Vec for put_object: {e:?}",); - return; + return Err(WasmIndexerError::DeserializationError); } }; let rt = tokio::runtime::Handle::current(); - rt.block_on(async { + let result = rt.block_on(async { idx_env .db .lock() @@ -179,18 +224,39 @@ fn put_object(mut env: FunctionEnvMut, type_id: i64, ptr: u32, len: u3 .put_object(type_id, columns, bytes) .await }); + + if let Err(e) = result { + error!("Failed to put_object: {e}"); + return Err(WasmIndexerError::DatabaseError); + }; + + Ok(()) } /// Execute the arbitrary query at the given pointer. /// /// This function is fallible, and will panic if the query cannot be executed. -fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32) { +fn put_many_to_many_record( + mut env: FunctionEnvMut, + ptr: u32, + len: u32, +) -> Result<(), WasmIndexerError> { let (idx_env, store) = env.data_and_store_mut(); - let mem = idx_env - .memory - .as_mut() - .expect("Memory unitialized") - .view(&store); + + if idx_env + .kill_switch + .load(std::sync::atomic::Ordering::SeqCst) + { + // If the kill switch has been flipped, returning an error will cause an + // early termination of WASM execution. + return Err(WasmIndexerError::KillSwitch); + } + + let mem = if let Some(memory) = idx_env.memory.as_mut() { + memory.view(&store) + } else { + return Err(WasmIndexerError::UninitializedMemory); + }; let mut bytes = Vec::with_capacity(len as usize); let range = ptr as usize..ptr as usize + len as usize; @@ -199,11 +265,16 @@ fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32 bytes.extend_from_slice(&mem.data_unchecked()[range]); } - let queries: Vec = - bincode::deserialize(&bytes).expect("Failed to deserialize queries"); - let queries = queries.iter().map(|q| q.to_string()).collect::>(); + let queries: Vec = match bincode::deserialize::>(&bytes) { + Ok(queries) => queries.iter().map(|q| q.to_string()).collect(), + Err(e) => { + error!("Failed to deserialize queries: {e:?}"); + return Err(WasmIndexerError::DeserializationError); + } + }; + let rt = tokio::runtime::Handle::current(); - rt.block_on(async { + let result = rt.block_on(async { idx_env .db .lock() @@ -211,6 +282,19 @@ fn put_many_to_many_record(mut env: FunctionEnvMut, ptr: u32, len: u32 .put_many_to_many_record(queries) .await }); + + if let Err(e) = result { + error!("Failed to put_many_to_many_record: {e:?}"); + return Err(WasmIndexerError::DatabaseError); + } + + Ok(()) +} + +/// When called from WASM it will terminate the execution and return the error +/// code. +pub fn early_exit(err_code: u32) -> Result<(), WasmIndexerError> { + Err(WasmIndexerError::from(err_code)) } /// Get the exports for the given store and environment. @@ -222,7 +306,9 @@ pub fn get_exports(store: &mut Store, env: &wasmer::FunctionEnv) -> Ex let f_log_data = Function::new_typed_with_env(store, env, log_data); let f_put_many_to_many_record = Function::new_typed_with_env(store, env, put_many_to_many_record); + let f_early_exit = Function::new_typed(store, early_exit); + exports.insert("ff_early_exit".to_string(), f_early_exit); exports.insert("ff_get_object".to_string(), f_get_obj); exports.insert("ff_put_object".to_string(), f_put_obj); exports.insert( diff --git a/packages/fuel-indexer/src/lib.rs b/packages/fuel-indexer/src/lib.rs index bb1554f7d..d02050b8a 100644 --- a/packages/fuel-indexer/src/lib.rs +++ b/packages/fuel-indexer/src/lib.rs @@ -68,8 +68,8 @@ pub enum IndexerError { HandlerError, #[error("Invalid port {0:?}")] InvalidPortNumber(#[from] core::num::ParseIntError), - #[error("No transaction is open.")] - NoTransactionError, + #[error("No open transaction for {0}. Was a transaction started?")] + NoTransactionError(String), #[error("{0}.")] Unknown(String), #[error("Indexer schema error: {0:?}")] diff --git a/plugins/forc-index/src/utils.rs b/plugins/forc-index/src/utils.rs index 3d0422050..5dc57c46b 100644 --- a/plugins/forc-index/src/utils.rs +++ b/plugins/forc-index/src/utils.rs @@ -136,7 +136,7 @@ pub fn cargo_workspace_root_dir( // Use serde to extract the "target_directory" field let target_directory = metadata_json["workspace_root"] .as_str() - .expect("target_directory not found or invalid"); + .expect("workspace_root not found or invalid"); Ok(target_directory.into()) }