Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
lostman committed Sep 11, 2023
1 parent e74645e commit 92e0139
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 36 deletions.
6 changes: 6 additions & 0 deletions packages/fuel-indexer-api-server/src/uses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ pub(crate) async fn remove_indexer(
}))
.await?;

// We have early termination on a kill switch. Yet, it is still possible for
// the database entries to be removed before the indexer has time to act on
// kill switch being triggered. Adding a small delay here should prevent
// unnecessary DatabaseError's appearing in the logs.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// Allways remove data when removing an indexer.
if let Err(e) =
queries::remove_indexer(&mut conn, &namespace, &identifier, true).await
Expand Down
11 changes: 7 additions & 4 deletions packages/fuel-indexer-macros/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ impl From<ObjectDecoder> for TokenStream {
.map(|query| query.to_string())
.collect::<Vec<_>>();

d.lock().await.put_many_to_many_record(queries).await.unwrap();
d.lock().await.put_many_to_many_record(queries).await.expect(&format!("Entity::save_many_to_many for {} failed.", stringify!(#ident)));
}
}
None => {}
Expand All @@ -846,12 +846,15 @@ impl From<ObjectDecoder> for TokenStream {
match &db {
Some(d) => {
match d.lock().await.get_object(Self::TYPE_ID, id.to_string()).await {
Some(bytes) => {
Ok(Some(bytes)) => {
let columns: Vec<FtColumn> = bincode::deserialize(&bytes).expect("Failed to deserialize Vec<FtColumn> for Entity::load.");
let obj = Self::from_row(columns);
Some(obj)
},
None => None,
Ok(None) => None,
Err(e) => {
panic!("Entity::load for {} failed.", stringify!(#ident))
}
}
}
None => None,
Expand All @@ -868,7 +871,7 @@ impl From<ObjectDecoder> for TokenStream {
Self::TYPE_ID,
self.to_row(),
serialize(&self.to_row())
).await.unwrap();
).await.expect(&format!("Entity::save for {} failed.", stringify!(#ident)));
}
None => {},
}
Expand Down
6 changes: 2 additions & 4 deletions packages/fuel-indexer-plugin/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ pub use hex::FromHex;
pub use sha2::{Digest, Sha256};
pub use std::collections::{HashMap, HashSet};

// All these methods have return type `Result<T, WasmIndexerError>`. `wasmer`
// uses the Err variant for ealy exit.
extern "C" {
// TODO: How do we want to return an error code for
// a function that returns a u32 but actually uses a u8?
fn ff_get_object(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8;
// log_data prints information to stdout.
fn ff_log_data(ptr: *const u8, len: u32, log_level: u32);
// Put methods have error codes.
fn ff_put_object(type_id: i64, ptr: *const u8, len: u32);
fn ff_put_many_to_many_record(ptr: *const u8, len: u32);
}
Expand Down
57 changes: 34 additions & 23 deletions packages/fuel-indexer/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{IndexerConfig, IndexerResult, Manifest};
use crate::{IndexerConfig, IndexerError, IndexerResult, Manifest};
use fuel_indexer_database::{queries, IndexerConnection, IndexerConnectionPool};
use fuel_indexer_lib::{
fully_qualified_namespace, graphql::types::IdCol, utils::format_sql_query,
Expand Down Expand Up @@ -64,28 +64,36 @@ impl Database {
let conn = self.pool.acquire().await?;
self.stashed = Some(conn);
debug!("Connection stashed as: {:?}", self.stashed);
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for start transaction. Was a transaction started?"
.to_string(),
))?;
let conn =
self.stashed
.as_mut()
.ok_or(crate::IndexerError::NoTransactionError(
"start_transaction".to_string(),
))?;
let result = queries::start_transaction(conn).await?;
Ok(result)
}

/// Commit transaction to database.
pub async fn commit_transaction(&mut self) -> IndexerResult<usize> {
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for commit. Was a transaction started?".to_string(),
))?;
let conn =
self.stashed
.as_mut()
.ok_or(crate::IndexerError::NoTransactionError(
"commit_transaction".to_string(),
))?;
let res = queries::commit_transaction(conn).await?;
Ok(res)
}

/// Revert open transaction.
pub async fn revert_transaction(&mut self) -> IndexerResult<usize> {
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for revert. Was a transaction started?".to_string(),
))?;
let conn =
self.stashed
.as_mut()
.ok_or(crate::IndexerError::NoTransactionError(
"revert_transaction".to_string(),
))?;
let res = queries::revert_transaction(conn).await?;
Ok(res)
}
Expand Down Expand Up @@ -135,7 +143,7 @@ impl Database {
let table = match self.tables.get(&type_id) {
Some(t) => t,
None => {
return Err(crate::IndexerError::Unknown(format!(
return Err(IndexerError::Unknown(format!(
r#"TypeId({type_id}) not found in tables: {:?}.
Does the schema version in SchemaManager::new_schema match the schema version in Database::load_schema?
Expand All @@ -160,9 +168,10 @@ Do your WASM modules need to be rebuilt?
let query_text =
format_sql_query(self.upsert_query(table, &columns, inserts, updates));

let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for put. Was a transaction started?".to_string(),
))?;
let conn = self
.stashed
.as_mut()
.ok_or(IndexerError::NoTransactionError("put_object".to_string()))?;

if self.config.verbose {
info!("{query_text}");
Expand All @@ -178,23 +187,22 @@ Do your WASM modules need to be rebuilt?
&mut self,
type_id: i64,
object_id: String,
) -> Option<Vec<u8>> {
) -> IndexerResult<Option<Vec<u8>>> {
let table = &self.tables[&type_id];
let query = self.get_query(table, &object_id);
let conn = self
.stashed
.as_mut()
.expect("No stashed connection for get. Was a transaction started?");

.ok_or(IndexerError::NoTransactionError("get_object".to_string()))?;
match queries::get_object(conn, query).await {
Ok(v) => Some(v),
Ok(v) => Ok(Some(v)),
Err(e) => {
if let sqlx::Error::RowNotFound = e {
debug!("Row not found for object ID: {object_id}");
} else {
error!("Failed to get_object: {e:?}");
}
None
Ok(None)
}
}
}
Expand Down Expand Up @@ -262,9 +270,12 @@ Do your WASM modules need to be rebuilt?
&mut self,
queries: Vec<String>,
) -> IndexerResult<()> {
let conn = self.stashed.as_mut().ok_or(crate::IndexerError::Unknown(
"No stashed connection for put. Was a transaction started?".to_string(),
))?;
let conn = self
.stashed
.as_mut()
.ok_or(IndexerError::NoTransactionError(
"put_many_to_many_record".to_string(),
))?;

for query in queries {
if self.config.verbose {
Expand Down
10 changes: 9 additions & 1 deletion packages/fuel-indexer/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,15 @@ impl Executor for WasmIndexExecutor {
.source()
.and_then(|e| e.downcast_ref::<WasmIndexerError>())
{
error!("Indexer({uid}) WASM execution failed: {e}.");
match e {
// Termination due to kill switch is an expected behavior.
WasmIndexerError::KillSwitch => {
info!("Indexer({uid}) WASM execution terminated: {e}.")
}
_ => {
error!("Indexer({uid}) WASM execution failed: {e}.")
}
}
} else {
error!("Indexer({uid}) WASM execution failed: {e:?}.");
};
Expand Down
8 changes: 6 additions & 2 deletions packages/fuel-indexer/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,12 @@ fn get_object(
let id = get_object_id(&mem, ptr + offset, len + padding + offset).unwrap();

let rt = tokio::runtime::Handle::current();
let bytes =
rt.block_on(async { idx_env.db.lock().await.get_object(type_id, id).await });
let bytes = rt
.block_on(async { idx_env.db.lock().await.get_object(type_id, id).await })
.map_err(|e| {
error!("Failed to get_object: {e}");
WasmIndexerError::DatabaseError
})?;

if let Some(bytes) = bytes {
let alloc_fn = idx_env.alloc.as_mut().expect("Alloc export is missing.");
Expand Down
4 changes: 2 additions & 2 deletions packages/fuel-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ pub enum IndexerError {
HandlerError,
#[error("Invalid port {0:?}")]
InvalidPortNumber(#[from] core::num::ParseIntError),
#[error("No transaction is open.")]
NoTransactionError,
#[error("No open transaction for {0}. Was a transaction started?")]
NoTransactionError(String),
#[error("{0}.")]
Unknown(String),
#[error("Indexer schema error: {0:?}")]
Expand Down

0 comments on commit 92e0139

Please sign in to comment.