From 88d388b02e5d48fc82f2113f886333891054dd3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Mon, 11 Sep 2023 18:29:05 +0200 Subject: [PATCH] review feedback --- packages/fuel-indexer-macros/src/decoder.rs | 4 +- packages/fuel-indexer/src/database.rs | 68 +++++++++++---------- packages/fuel-indexer/src/executor.rs | 10 ++- packages/fuel-indexer/src/ffi.rs | 50 +++++++-------- packages/fuel-indexer/src/lib.rs | 4 +- 5 files changed, 73 insertions(+), 63 deletions(-) diff --git a/packages/fuel-indexer-macros/src/decoder.rs b/packages/fuel-indexer-macros/src/decoder.rs index 88bcfdb81..76d2e7c5e 100644 --- a/packages/fuel-indexer-macros/src/decoder.rs +++ b/packages/fuel-indexer-macros/src/decoder.rs @@ -846,12 +846,12 @@ impl From for TokenStream { match &db { Some(d) => { match d.lock().await.get_object(Self::TYPE_ID, id.to_string()).await { - Some(bytes) => { + Ok(bytes) => { let columns: Vec = bincode::deserialize(&bytes).expect("Failed to deserialize Vec for Entity::load."); let obj = Self::from_row(columns); Some(obj) }, - None => None, + Err(_) => None, } } None => None, diff --git a/packages/fuel-indexer/src/database.rs b/packages/fuel-indexer/src/database.rs index c0a6c91d7..8d4dd12e6 100644 --- a/packages/fuel-indexer/src/database.rs +++ b/packages/fuel-indexer/src/database.rs @@ -1,11 +1,11 @@ -use crate::{IndexerConfig, IndexerResult, Manifest}; +use crate::{IndexerConfig, IndexerError, IndexerResult, Manifest}; use fuel_indexer_database::{queries, IndexerConnection, IndexerConnectionPool}; use fuel_indexer_lib::{ fully_qualified_namespace, graphql::types::IdCol, utils::format_sql_query, }; use fuel_indexer_schema::FtColumn; use std::collections::HashMap; -use tracing::{debug, error, info}; +use tracing::{debug, info}; /// Database for an executor instance, with schema info. #[derive(Debug)] @@ -64,28 +64,36 @@ impl Database { let conn = self.pool.acquire().await?; self.stashed = Some(conn); debug!("Connection stashed as: {:?}", self.stashed); - let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( - "No stashed connection for start transaction. Was a transaction started?" - .to_string(), - ))?; + let conn = + self.stashed + .as_mut() + .ok_or(crate::IndexerError::NoTransactionError( + "start_transaction".to_string(), + ))?; let result = queries::start_transaction(conn).await?; Ok(result) } /// Commit transaction to database. pub async fn commit_transaction(&mut self) -> IndexerResult { - let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( - "No stashed connection for commit. Was a transaction started?".to_string(), - ))?; + let conn = + self.stashed + .as_mut() + .ok_or(crate::IndexerError::NoTransactionError( + "commit_transaction".to_string(), + ))?; let res = queries::commit_transaction(conn).await?; Ok(res) } /// Revert open transaction. pub async fn revert_transaction(&mut self) -> IndexerResult { - let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( - "No stashed connection for revert. Was a transaction started?".to_string(), - ))?; + let conn = + self.stashed + .as_mut() + .ok_or(crate::IndexerError::NoTransactionError( + "revert_transaction".to_string(), + ))?; let res = queries::revert_transaction(conn).await?; Ok(res) } @@ -135,7 +143,7 @@ impl Database { let table = match self.tables.get(&type_id) { Some(t) => t, None => { - return Err(crate::IndexerError::Unknown(format!( + return Err(IndexerError::Unknown(format!( r#"TypeId({type_id}) not found in tables: {:?}. Does the schema version in SchemaManager::new_schema match the schema version in Database::load_schema? @@ -160,9 +168,10 @@ Do your WASM modules need to be rebuilt? let query_text = format_sql_query(self.upsert_query(table, &columns, inserts, updates)); - let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( - "No stashed connection for put. Was a transaction started?".to_string(), - ))?; + let conn = self + .stashed + .as_mut() + .ok_or(IndexerError::NoTransactionError("put_object".to_string()))?; if self.config.verbose { info!("{query_text}"); @@ -178,25 +187,15 @@ Do your WASM modules need to be rebuilt? &mut self, type_id: i64, object_id: String, - ) -> Option> { + ) -> IndexerResult> { let table = &self.tables[&type_id]; let query = self.get_query(table, &object_id); let conn = self .stashed .as_mut() - .expect("No stashed connection for get. Was a transaction started?"); - - match queries::get_object(conn, query).await { - Ok(v) => Some(v), - Err(e) => { - if let sqlx::Error::RowNotFound = e { - debug!("Row not found for object ID: {object_id}"); - } else { - error!("Failed to get_object: {e:?}"); - } - None - } - } + .ok_or(IndexerError::NoTransactionError("get_object".to_string()))?; + let result = queries::get_object(conn, query).await?; + Ok(result) } /// Load the schema for this indexer from the database, and build a mapping of `TypeId`s to tables. @@ -262,9 +261,12 @@ Do your WASM modules need to be rebuilt? &mut self, queries: Vec, ) -> IndexerResult<()> { - let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown( - "No stashed connection for put. Was a transaction started?".to_string(), - ))?; + let conn = self + .stashed + .as_mut() + .ok_or(IndexerError::NoTransactionError( + "put_many_to_many_record".to_string(), + ))?; for query in queries { if self.config.verbose { diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 914a28c61..250b3a6cd 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -934,7 +934,15 @@ impl Executor for WasmIndexExecutor { .source() .and_then(|e| e.downcast_ref::()) { - error!("Indexer({uid}) WASM execution failed: {e}."); + match e { + // Termination due to kill switch is an expected behavior. + WasmIndexerError::KillSwitch => { + info!("Indexer({uid}) WASM execution terminated: {e}.") + } + _ => { + error!("Indexer({uid}) WASM execution failed: {e}.") + } + } } else { error!("Indexer({uid}) WASM execution failed: {e:?}."); }; diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index 59a1121c7..4a0450775 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -140,34 +140,34 @@ fn get_object( let id = get_object_id(&mem, ptr + offset, len + padding + offset).unwrap(); let rt = tokio::runtime::Handle::current(); - let bytes = - rt.block_on(async { idx_env.db.lock().await.get_object(type_id, id).await }); - - if let Some(bytes) = bytes { - let alloc_fn = idx_env.alloc.as_mut().expect("Alloc export is missing."); - - let size = bytes.len() as u32; - let result = alloc_fn.call(&mut store, size).expect("Alloc failed."); - let range = result as usize..result as usize + size as usize; - - let mem = idx_env - .memory - .as_mut() - .expect("Memory unitialized.") - .view(&store); - WasmPtr::::new(len_ptr) - .deref(&mem) - .write(size) - .expect("Failed to write length to memory."); + let bytes = rt + .block_on(async { idx_env.db.lock().await.get_object(type_id, id).await }) + .map_err(|e| { + error!("Failed to put_object: {e}"); + WasmIndexerError::DatabaseError + })?; - unsafe { - mem.data_unchecked_mut()[range].copy_from_slice(&bytes); - } + let alloc_fn = idx_env.alloc.as_mut().expect("Alloc export is missing."); - Ok(result) - } else { - Ok(0) + let size = bytes.len() as u32; + let result = alloc_fn.call(&mut store, size).expect("Alloc failed."); + let range = result as usize..result as usize + size as usize; + + let mem = idx_env + .memory + .as_mut() + .expect("Memory unitialized.") + .view(&store); + WasmPtr::::new(len_ptr) + .deref(&mem) + .write(size) + .expect("Failed to write length to memory."); + + unsafe { + mem.data_unchecked_mut()[range].copy_from_slice(&bytes); } + + Ok(result) } /// Put the given type at the given pointer into memory. diff --git a/packages/fuel-indexer/src/lib.rs b/packages/fuel-indexer/src/lib.rs index bb1554f7d..d02050b8a 100644 --- a/packages/fuel-indexer/src/lib.rs +++ b/packages/fuel-indexer/src/lib.rs @@ -68,8 +68,8 @@ pub enum IndexerError { HandlerError, #[error("Invalid port {0:?}")] InvalidPortNumber(#[from] core::num::ParseIntError), - #[error("No transaction is open.")] - NoTransactionError, + #[error("No open transaction for {0}. Was a transaction started?")] + NoTransactionError(String), #[error("{0}.")] Unknown(String), #[error("Indexer schema error: {0:?}")]