Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

enhancement: WASM error codes, early exit on error or kill switch #1337

Merged
merged 23 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/fuel-indexer-benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use fuel_indexer::{
use fuel_indexer_database::IndexerConnectionPool;
use fuel_indexer_lib::config::DatabaseConfig;
use fuel_indexer_tests::fixtures::TestPostgresDb;
use std::{str::FromStr, sync::atomic::AtomicBool};
use std::{str::FromStr, sync::atomic::AtomicBool, sync::Arc};

/// Location of Fuel node to be used for block retrieval.
pub const NODE_URL: &str = "beta-4.fuel.network:80";
Expand Down Expand Up @@ -40,6 +40,7 @@ async fn setup_wasm_executor(
db_url: String,
pool: IndexerConnectionPool,
) -> Result<WasmIndexExecutor, ()> {
let kill_switch = Arc::new(AtomicBool::new(false));
config.database = DatabaseConfig::from_str(&db_url).unwrap();
let schema_version = manifest
.graphql_schema_content()
Expand All @@ -52,6 +53,7 @@ async fn setup_wasm_executor(
manifest.module_bytes().unwrap(),
pool,
schema_version,
kill_switch,
)
.await
.expect("Could not setup WASM executor");
Expand Down
64 changes: 64 additions & 0 deletions packages/fuel-indexer-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,70 @@ impl ExecutionSource {
}
}

#[derive(Debug, Clone, Copy)]
pub enum WasmIndexerError {
DeserializationError = 1,
SerializationError,
PutObjectError,
UnableToSaveListType,
UninitializedMemory,
UnableToFetchLogString,
KillSwitch,
DatabaseError,
GeneralError,
}

impl From<i32> for WasmIndexerError {
fn from(value: i32) -> Self {
match value {
0 => unreachable!("WasmIndexerError index starts at 1"),
lostman marked this conversation as resolved.
Show resolved Hide resolved
1 => Self::DeserializationError,
2 => Self::SerializationError,
3 => Self::PutObjectError,
4 => Self::UnableToSaveListType,
5 => Self::UninitializedMemory,
6 => Self::UnableToFetchLogString,
7 => Self::KillSwitch,
8 => Self::DatabaseError,
_ => Self::GeneralError,
}
}
}

impl std::fmt::Display for WasmIndexerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SerializationError => {
write!(f, "Failed to serialize")
lostman marked this conversation as resolved.
Show resolved Hide resolved
}
Self::DeserializationError => {
write!(f, "Failed to deserialize")
lostman marked this conversation as resolved.
Show resolved Hide resolved
}
Self::UnableToSaveListType => {
write!(f, "Failed to save list")
}
Self::PutObjectError => {
write!(f, "Failed to save object")
}
Self::UninitializedMemory => {
write!(f, "Failed to create MemoryView for indexer")
}
Self::UnableToFetchLogString => {
write!(f, "Failed to fetch log string")
lostman marked this conversation as resolved.
Show resolved Hide resolved
}
Self::KillSwitch => {
write!(f, "Kill switch has been triggered")
lostman marked this conversation as resolved.
Show resolved Hide resolved
}
Self::DatabaseError => {
write!(f, "Failed performing a database operation")
lostman marked this conversation as resolved.
Show resolved Hide resolved
}
Self::GeneralError => write!(f, "A WASM error occurred"),
lostman marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

impl std::error::Error for WasmIndexerError {}
lostman marked this conversation as resolved.
Show resolved Hide resolved

/// Return a fully qualified indexer namespace.
pub fn fully_qualified_namespace(namespace: &str, identifier: &str) -> String {
format!("{}_{}", namespace, identifier)
Expand Down
4 changes: 2 additions & 2 deletions packages/fuel-indexer-macros/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ impl From<ObjectDecoder> for TokenStream {
.map(|query| query.to_string())
.collect::<Vec<_>>();

d.lock().await.put_many_to_many_record(queries).await;
d.lock().await.put_many_to_many_record(queries).await.unwrap();
lostman marked this conversation as resolved.
Show resolved Hide resolved
}
}
None => {}
Expand Down Expand Up @@ -868,7 +868,7 @@ impl From<ObjectDecoder> for TokenStream {
Self::TYPE_ID,
self.to_row(),
serialize(&self.to_row())
).await;
).await.unwrap();
lostman marked this conversation as resolved.
Show resolved Hide resolved
}
None => {},
}
Expand Down
57 changes: 48 additions & 9 deletions packages/fuel-indexer-plugin/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use alloc::vec::Vec;
use fuel_indexer_lib::{
graphql::MAX_FOREIGN_KEY_LIST_FIELDS,
utils::{deserialize, serialize},
WasmIndexerError,
};
use fuel_indexer_schema::{
join::{JoinMetadata, RawQuery},
Expand All @@ -17,11 +18,14 @@ pub use sha2::{Digest, Sha256};
pub use std::collections::{HashMap, HashSet};

extern "C" {
// TODO: error codes? or just panic and let the runtime handle it?
// TODO: How do we want to return an error code for
lostman marked this conversation as resolved.
Show resolved Hide resolved
// a function that returns a u32 but actually uses a u8?
fn ff_get_object(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8;
// log_data prints information to stdout.
fn ff_log_data(ptr: *const u8, len: u32, log_level: u32);
// Put methods have error codes.
fn ff_put_object(type_id: i64, ptr: *const u8, len: u32);
fn ff_put_many_to_many_record(ptr: *const u8, len: u32);
fn ff_log_data(ptr: *const u8, len: u32, log_level: u32);
}

// TODO: more to do here, hook up to 'impl log::Log for Logger'
Expand Down Expand Up @@ -49,18 +53,28 @@ impl Logger {
}
}

/// Trait for a type entity.
lostman marked this conversation as resolved.
Show resolved Hide resolved
///
/// Any entity type that will be processed through a WASM indexer is required to implement this trait.
pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug {
/// Unique identifier for a type.
const TYPE_ID: i64;

/// Necessary metadata for saving an entity's list type fields.
const JOIN_METADATA: Option<[Option<JoinMetadata<'a>>; MAX_FOREIGN_KEY_LIST_FIELDS]>;

/// Convert database row representation into an instance of an entity.
fn from_row(vec: Vec<FtColumn>) -> Self;

/// Convert an instance of an entity into row representation for use in a database.
fn to_row(&self) -> Vec<FtColumn>;

/// Returns an entity's internal type ID.
fn type_id(&self) -> i64 {
Self::TYPE_ID
}

/// Saves a record that contains a list of multiple elements.
fn save_many_to_many(&self) {
if let Some(meta) = Self::JOIN_METADATA {
let items = meta.iter().filter_map(|x| x.clone()).collect::<Vec<_>>();
Expand All @@ -71,40 +85,64 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug {
.filter(|query| !query.is_empty())
.collect::<Vec<_>>();
let bytes = serialize(&queries);
unsafe { ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32) }
unsafe {
ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32);
}
}
}

/// Loads a record given a UID.
fn load(id: UID) -> Option<Self> {
Self::load_unsafe(id).unwrap()
lostman marked this conversation as resolved.
Show resolved Hide resolved
}

/// Loads a record through the FFI with the WASM runtime and checks for errors.
fn load_unsafe(id: UID) -> Result<Option<Self>, WasmIndexerError> {
unsafe {
let buff = bincode::serialize(&id.to_string()).unwrap();
let buff = if let Ok(bytes) = bincode::serialize(&id.to_string()) {
bytes
} else {
return Err(WasmIndexerError::SerializationError);
};

let mut bufflen = (buff.len() as u32).to_le_bytes();

let ptr = ff_get_object(Self::TYPE_ID, buff.as_ptr(), bufflen.as_mut_ptr());

if !ptr.is_null() {
let len = u32::from_le_bytes(bufflen) as usize;
let bytes = Vec::from_raw_parts(ptr, len, len);
let vec = deserialize(&bytes).expect("Bad serialization.");
let vec = if let Ok(v) = deserialize(&bytes) {
v
} else {
return Err(WasmIndexerError::DeserializationError);
};

return Some(Self::from_row(vec));
return Ok(Some(Self::from_row(vec)));
}

None
Ok(None)
}
}

/// Saves a record.
fn save(&self) {
self.save_unsafe()
}

/// Saves a record through the FFI with the WASM runtime and checks for errors.
fn save_unsafe(&self) {
unsafe {
let buf = serialize(&self.to_row());
ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32)
ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32);
}

self.save_many_to_many();
self.save_many_to_many()
}
}

#[no_mangle]
/// Allocation function to be called by an executor in a WASM runtime.
fn alloc_fn(size: u32) -> *const u8 {
let vec = Vec::with_capacity(size as usize);
let ptr = vec.as_ptr();
Expand All @@ -115,6 +153,7 @@ fn alloc_fn(size: u32) -> *const u8 {
}

#[no_mangle]
/// Deallocation function to be called by an executor in a WASM runtime.
fn dealloc_fn(ptr: *mut u8, len: usize) {
let _vec = unsafe { Vec::from_raw_parts(ptr, len, len) };
}
4 changes: 3 additions & 1 deletion packages/fuel-indexer-tests/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use fuel_indexer::{Executor, IndexerConfig, WasmIndexExecutor};
use fuel_indexer_lib::{config::DatabaseConfig, manifest::Manifest};
use fuel_indexer_tests::fixtures::TestPostgresDb;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::{atomic::AtomicBool, Arc};

#[tokio::test]
async fn test_wasm_executor_can_meter_execution() {
Expand Down Expand Up @@ -48,12 +48,14 @@ async fn test_wasm_executor_can_meter_execution() {
.version()
.to_string();

let kill_switch = Arc::new(AtomicBool::new(false));
let mut executor = WasmIndexExecutor::new(
&config,
&manifest,
bytes.clone(),
pool,
schema_version,
kill_switch,
)
.await
.unwrap();
Expand Down
59 changes: 29 additions & 30 deletions packages/fuel-indexer/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,28 @@ impl Database {
let conn = self.pool.acquire().await?;
self.stashed = Some(conn);
debug!("Connection stashed as: {:?}", self.stashed);
let conn = self.stashed.as_mut().expect(
"No stashed connection for start transaction. Was a transaction started?",
);
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
lostman marked this conversation as resolved.
Show resolved Hide resolved
"No stashed connection for start transaction. Was a transaction started?"
.to_string(),
))?;
let result = queries::start_transaction(conn).await?;
Ok(result)
}

/// Commit transaction to database.
pub async fn commit_transaction(&mut self) -> IndexerResult<usize> {
let conn = self
.stashed
.as_mut()
.expect("No stashed connection for commit. Was a transaction started?");
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for commit. Was a transaction started?".to_string(),
))?;
let res = queries::commit_transaction(conn).await?;
Ok(res)
}

/// Revert open transaction.
pub async fn revert_transaction(&mut self) -> IndexerResult<usize> {
let conn = self
.stashed
.as_mut()
.expect("No stashed connection for revert. Was a transaction started?");
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for revert. Was a transaction started?".to_string(),
))?;
let res = queries::revert_transaction(conn).await?;
Ok(res)
}
Expand Down Expand Up @@ -132,11 +131,11 @@ impl Database {
type_id: i64,
columns: Vec<FtColumn>,
bytes: Vec<u8>,
) {
) -> IndexerResult<()> {
let table = match self.tables.get(&type_id) {
Some(t) => t,
None => {
error!(
return Err(crate::IndexerError::Unknown(format!(
r#"TypeId({type_id}) not found in tables: {:?}.

Does the schema version in SchemaManager::new_schema match the schema version in Database::load_schema?
Expand All @@ -145,8 +144,7 @@ Do your WASM modules need to be rebuilt?

"#,
self.tables,
);
return;
)));
}
};

Expand All @@ -162,18 +160,17 @@ Do your WASM modules need to be rebuilt?
let query_text =
format_sql_query(self.upsert_query(table, &columns, inserts, updates));

let conn = self
.stashed
.as_mut()
.expect("No stashed connection for put. Was a transaction started?");
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for put. Was a transaction started?".to_string(),
))?;

if self.config.verbose {
info!("{query_text}");
}

if let Err(e) = queries::put_object(conn, query_text, bytes).await {
error!("Failed to put_object: {e:?}");
}
queries::put_object(conn, query_text, bytes).await?;

Ok(())
}

/// Get an object from the database.
Expand Down Expand Up @@ -261,20 +258,22 @@ Do your WASM modules need to be rebuilt?
///
/// There are multiple queries here because a single parent `TypeDefinition` can have several
/// many-to-many relationships with children `TypeDefinition`s.
pub async fn put_many_to_many_record(&mut self, queries: Vec<String>) {
let conn = self
.stashed
.as_mut()
.expect("No stashed connection for put. Was a transaction started?");
pub async fn put_many_to_many_record(
&mut self,
queries: Vec<String>,
) -> IndexerResult<()> {
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for put. Was a transaction started?".to_string(),
))?;

for query in queries {
if self.config.verbose {
info!("{query}");
}

if let Err(e) = queries::put_many_to_many_record(conn, query).await {
error!("Failed to put_many_to_many_record: {e:?}");
}
queries::put_many_to_many_record(conn, query).await?;
}

Ok(())
}
}
Loading