Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
early exit and kill switch
Browse files Browse the repository at this point in the history
  • Loading branch information
lostman committed Sep 7, 2023
1 parent 21e2e2f commit e74645e
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 76 deletions.
4 changes: 3 additions & 1 deletion packages/fuel-indexer-benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use fuel_indexer::{
use fuel_indexer_database::IndexerConnectionPool;
use fuel_indexer_lib::config::DatabaseConfig;
use fuel_indexer_tests::fixtures::TestPostgresDb;
use std::{str::FromStr, sync::atomic::AtomicBool};
use std::{str::FromStr, sync::atomic::AtomicBool, sync::Arc};

/// Location of Fuel node to be used for block retrieval.
pub const NODE_URL: &str = "beta-4.fuel.network:80";
Expand Down Expand Up @@ -40,6 +40,7 @@ async fn setup_wasm_executor(
db_url: String,
pool: IndexerConnectionPool,
) -> Result<WasmIndexExecutor, ()> {
let kill_switch = Arc::new(AtomicBool::new(false));
config.database = DatabaseConfig::from_str(&db_url).unwrap();
let schema_version = manifest
.graphql_schema_content()
Expand All @@ -52,6 +53,7 @@ async fn setup_wasm_executor(
manifest.module_bytes().unwrap(),
pool,
schema_version,
kill_switch,
)
.await
.expect("Could not setup WASM executor");
Expand Down
10 changes: 10 additions & 0 deletions packages/fuel-indexer-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub enum WasmIndexerError {
UnableToSaveListType,
UninitializedMemory,
UnableToFetchLogString,
KillSwitch,
DatabaseError,
GeneralError,
}

Expand All @@ -57,6 +59,8 @@ impl From<i32> for WasmIndexerError {
4 => Self::UnableToSaveListType,
5 => Self::UninitializedMemory,
6 => Self::UnableToFetchLogString,
7 => Self::KillSwitch,
8 => Self::DatabaseError,
_ => Self::GeneralError,
}
}
Expand All @@ -83,6 +87,12 @@ impl std::fmt::Display for WasmIndexerError {
Self::UnableToFetchLogString => {
write!(f, "Failed to fetch log string")
}
Self::KillSwitch => {
write!(f, "Kill switch has been triggered")
}
Self::DatabaseError => {
write!(f, "Failed performing a database operation")
}
Self::GeneralError => write!(f, "A WASM error occurred"),
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/fuel-indexer-macros/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ impl From<ObjectDecoder> for TokenStream {
.map(|query| query.to_string())
.collect::<Vec<_>>();

d.lock().await.put_many_to_many_record(queries).await;
d.lock().await.put_many_to_many_record(queries).await.unwrap();
}
}
None => {}
Expand Down Expand Up @@ -868,7 +868,7 @@ impl From<ObjectDecoder> for TokenStream {
Self::TYPE_ID,
self.to_row(),
serialize(&self.to_row())
).await;
).await.unwrap();
}
None => {},
}
Expand Down
25 changes: 7 additions & 18 deletions packages/fuel-indexer-plugin/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ extern "C" {
// log_data prints information to stdout.
fn ff_log_data(ptr: *const u8, len: u32, log_level: u32);
// Put methods have error codes.
fn ff_put_object(type_id: i64, ptr: *const u8, len: u32) -> i32;
fn ff_put_many_to_many_record(ptr: *const u8, len: u32) -> i32;
fn ff_put_object(type_id: i64, ptr: *const u8, len: u32);
fn ff_put_many_to_many_record(ptr: *const u8, len: u32);
}

// TODO: more to do here, hook up to 'impl log::Log for Logger'
Expand Down Expand Up @@ -75,7 +75,7 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug {
}

/// Saves a record that contains a list of multiple elements.
fn save_many_to_many(&self) -> Result<(), WasmIndexerError> {
fn save_many_to_many(&self) {
if let Some(meta) = Self::JOIN_METADATA {
let items = meta.iter().filter_map(|x| x.clone()).collect::<Vec<_>>();
let row = self.to_row();
Expand All @@ -86,16 +86,8 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug {
.collect::<Vec<_>>();
let bytes = serialize(&queries);
unsafe {
let res = ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32);

if res != 0 {
return Err(WasmIndexerError::UnableToSaveListType);
}

Ok(())
ff_put_many_to_many_record(bytes.as_ptr(), bytes.len() as u32);
}
} else {
Ok(())
}
}

Expand Down Expand Up @@ -135,17 +127,14 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug {

/// Saves a record.
fn save(&self) {
self.save_unsafe().unwrap()
self.save_unsafe()
}

/// Saves a record through the FFI with the WASM runtime and checks for errors.
fn save_unsafe(&self) -> Result<(), WasmIndexerError> {
fn save_unsafe(&self) {
unsafe {
let buf = serialize(&self.to_row());
let res = ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32);
if res != 0 {
return Err(WasmIndexerError::from(res));
}
ff_put_object(Self::TYPE_ID, buf.as_ptr(), buf.len() as u32);
}

self.save_many_to_many()
Expand Down
4 changes: 3 additions & 1 deletion packages/fuel-indexer-tests/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use fuel_indexer::{Executor, IndexerConfig, WasmIndexExecutor};
use fuel_indexer_lib::{config::DatabaseConfig, manifest::Manifest};
use fuel_indexer_tests::fixtures::TestPostgresDb;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::{atomic::AtomicBool, Arc};

#[tokio::test]
async fn test_wasm_executor_can_meter_execution() {
Expand Down Expand Up @@ -48,12 +48,14 @@ async fn test_wasm_executor_can_meter_execution() {
.version()
.to_string();

let kill_switch = Arc::new(AtomicBool::new(false));
let mut executor = WasmIndexExecutor::new(
&config,
&manifest,
bytes.clone(),
pool,
schema_version,
kill_switch,
)
.await
.unwrap();
Expand Down
59 changes: 29 additions & 30 deletions packages/fuel-indexer/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,28 @@ impl Database {
let conn = self.pool.acquire().await?;
self.stashed = Some(conn);
debug!("Connection stashed as: {:?}", self.stashed);
let conn = self.stashed.as_mut().expect(
"No stashed connection for start transaction. Was a transaction started?",
);
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for start transaction. Was a transaction started?"
.to_string(),
))?;
let result = queries::start_transaction(conn).await?;
Ok(result)
}

/// Commit transaction to database.
pub async fn commit_transaction(&mut self) -> IndexerResult<usize> {
let conn = self
.stashed
.as_mut()
.expect("No stashed connection for commit. Was a transaction started?");
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for commit. Was a transaction started?".to_string(),
))?;
let res = queries::commit_transaction(conn).await?;
Ok(res)
}

/// Revert open transaction.
pub async fn revert_transaction(&mut self) -> IndexerResult<usize> {
let conn = self
.stashed
.as_mut()
.expect("No stashed connection for revert. Was a transaction started?");
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for revert. Was a transaction started?".to_string(),
))?;
let res = queries::revert_transaction(conn).await?;
Ok(res)
}
Expand Down Expand Up @@ -132,11 +131,11 @@ impl Database {
type_id: i64,
columns: Vec<FtColumn>,
bytes: Vec<u8>,
) {
) -> IndexerResult<()> {
let table = match self.tables.get(&type_id) {
Some(t) => t,
None => {
error!(
return Err(crate::IndexerError::Unknown(format!(
r#"TypeId({type_id}) not found in tables: {:?}.
Does the schema version in SchemaManager::new_schema match the schema version in Database::load_schema?
Expand All @@ -145,8 +144,7 @@ Do your WASM modules need to be rebuilt?
"#,
self.tables,
);
return;
)));
}
};

Expand All @@ -162,18 +160,17 @@ Do your WASM modules need to be rebuilt?
let query_text =
format_sql_query(self.upsert_query(table, &columns, inserts, updates));

let conn = self
.stashed
.as_mut()
.expect("No stashed connection for put. Was a transaction started?");
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for put. Was a transaction started?".to_string(),
))?;

if self.config.verbose {
info!("{query_text}");
}

if let Err(e) = queries::put_object(conn, query_text, bytes).await {
error!("Failed to put_object: {e:?}");
}
queries::put_object(conn, query_text, bytes).await?;

Ok(())
}

/// Get an object from the database.
Expand Down Expand Up @@ -261,20 +258,22 @@ Do your WASM modules need to be rebuilt?
///
/// There are multiple queries here because a single parent `TypeDefinition` can have several
/// many-to-many relationships with children `TypeDefinition`s.
pub async fn put_many_to_many_record(&mut self, queries: Vec<String>) {
let conn = self
.stashed
.as_mut()
.expect("No stashed connection for put. Was a transaction started?");
pub async fn put_many_to_many_record(
&mut self,
queries: Vec<String>,
) -> IndexerResult<()> {
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for put. Was a transaction started?".to_string(),
))?;

for query in queries {
if self.config.verbose {
info!("{query}");
}

if let Err(e) = queries::put_many_to_many_record(conn, query).await {
error!("Failed to put_many_to_many_record: {e:?}");
}
queries::put_many_to_many_record(conn, query).await?;
}

Ok(())
}
}
Loading

0 comments on commit e74645e

Please sign in to comment.