Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
enhancement: don't allow missing blocks (#1349)
Browse files Browse the repository at this point in the history
* Implement error codes for FFI WASM functions

* early exit and kill switch

* review feedback

* update comment

* add early_exit FFI function and handle errors in load

* fmt

* add wasm exit codes test and fix an expect msg

* fmt

* cargo sort

* Update packages/fuel-indexer-lib/src/lib.rs

Co-authored-by: rashad <[email protected]>

* Update packages/fuel-indexer-lib/src/lib.rs

Co-authored-by: rashad <[email protected]>

* Update packages/fuel-indexer-lib/src/lib.rs

Co-authored-by: rashad <[email protected]>

* more review feedback

* improve ealy_exit function

* add database trigger to ensure indexers cannot miss any blocks

* handle missing blocks as a speacial case of database error

* add missing block test

* adjust get_start_block to be compatible with the trigger

* update test

* adjust last_block_height_for_indexer

* more test fixes

* remove unused variant that was re-introduced in a merge

* clearer comment

* handle trigger more like other sql statements

* move trigger function from migration to rust

---------

Co-authored-by: Alexander Decurnou <[email protected]>
Co-authored-by: rashad <[email protected]>
  • Loading branch information
3 people authored Sep 21, 2023
1 parent 2424dd7 commit abb8abf
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 13 deletions.
39 changes: 37 additions & 2 deletions packages/fuel-indexer-database/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,8 @@ pub async fn indexer_assets(
})
}

/// Return the last block height that the given indexer has indexed.
/// Return the last block height that the given indexer has indexed. If the
/// indexer indexed no blocks, the result is 0.
#[cfg_attr(feature = "metrics", metrics)]
pub async fn last_block_height_for_indexer(
conn: &mut PoolConnection<Postgres>,
Expand All @@ -649,7 +650,7 @@ pub async fn last_block_height_for_indexer(
Ok(row
.try_get::<i32, usize>(0)
.map(|id| id.to_u32().expect("Bad block height."))
.unwrap_or_else(|_e| 1))
.unwrap_or(0))
}

// TODO: https://github.com/FuelLabs/fuel-indexer/issues/251
Expand Down Expand Up @@ -911,3 +912,37 @@ pub async fn put_many_to_many_record(
execute_query(conn, query).await?;
Ok(())
}

pub async fn create_ensure_block_height_consecutive_trigger(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
) -> sqlx::Result<()> {
let trigger_function = "CREATE OR REPLACE FUNCTION ensure_block_height_consecutive()
RETURNS TRIGGER AS $$
DECLARE
block_height integer;
BEGIN
EXECUTE format('SELECT MAX(block_height) FROM %I.%I', TG_TABLE_SCHEMA, TG_TABLE_NAME) INTO block_height;
IF NEW.block_height IS NOT NULL AND block_height IS NOT NULL AND NEW.block_height != block_height + 1 THEN
RAISE EXCEPTION '%.%: attempted to insert value with block_height = % while last indexed block_height = %. block_height values must be consecutive.', TG_TABLE_SCHEMA, TG_TABLE_NAME, NEW.block_height, block_height;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;".to_string();

execute_query(conn, trigger_function).await.unwrap();

let trigger = format!(
"CREATE TRIGGER trigger_ensure_block_height_consecutive
BEFORE INSERT OR UPDATE ON {namespace}_{identifier}.indexmetadataentity
FOR EACH ROW
EXECUTE FUNCTION ensure_block_height_consecutive();"
);

execute_query(conn, trigger).await?;

Ok(())
}
8 changes: 8 additions & 0 deletions packages/fuel-indexer-database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ pub enum IndexerConnection {
Postgres(Box<PoolConnection<sqlx::Postgres>>),
}

impl IndexerConnection {
pub fn database_type(&self) -> DbType {
match self {
IndexerConnection::Postgres(_) => DbType::Postgres,
}
}
}

#[derive(Clone, Debug)]
pub enum IndexerConnectionPool {
Postgres(sqlx::Pool<sqlx::Postgres>),
Expand Down
15 changes: 15 additions & 0 deletions packages/fuel-indexer-database/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,18 @@ pub async fn put_many_to_many_record(
}
}
}

pub async fn create_ensure_block_height_consecutive_trigger(
conn: &mut IndexerConnection,
namespace: &str,
identifier: &str,
) -> sqlx::Result<()> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::create_ensure_block_height_consecutive_trigger(
c, namespace, identifier,
)
.await
}
}
}
11 changes: 8 additions & 3 deletions packages/fuel-indexer-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub enum WasmIndexerError {
UninitializedMemory,
KillSwitch,
DatabaseError,
MissingBlocksError,
GeneralError,
}

Expand All @@ -56,8 +57,9 @@ impl From<u32> for WasmIndexerError {
2 => Self::PutObjectError,
3 => Self::UnableToSaveListType,
4 => Self::UninitializedMemory,
6 => Self::KillSwitch,
7 => Self::DatabaseError,
5 => Self::KillSwitch,
6 => Self::DatabaseError,
7 => Self::MissingBlocksError,
_ => Self::GeneralError,
}
}
Expand Down Expand Up @@ -88,7 +90,10 @@ impl std::fmt::Display for WasmIndexerError {
)
}
Self::DatabaseError => {
write!(f, "Database operation failed.")
write!(f, "Failed performing a database operation")
}
Self::MissingBlocksError => {
write!(f, "Some blocks are missing")
}
Self::GeneralError => write!(f, "Some unspecified WASM error occurred."),
}
Expand Down
7 changes: 7 additions & 0 deletions packages/fuel-indexer-schema/src/db/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ impl IndexerSchema {
queries::execute_query(conn, stmnt.to_owned()).await?;
}

queries::create_ensure_block_height_consecutive_trigger(
conn,
&self.namespace,
&self.identifier,
)
.await?;

self.tables = tables;

Ok(self)
Expand Down
77 changes: 74 additions & 3 deletions packages/fuel-indexer-tests/tests/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,25 +604,36 @@ async fn test_start_block() {
ref manifest,
..
} = setup_indexing_test_components(None).await;

let mut conn = fuel_indexer_database::IndexerConnection::Postgres(Box::new(
db.pool.acquire().await.unwrap(),
));

// Allow the indexer to start and process blocks.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

let start = fuel_indexer::get_start_block(&mut conn, &manifest)
.await
.unwrap();

assert_eq!(start, 1);
// setup_indexing_test_components deploys a contract, so one block exists.
// The indexer should have processed that block. The start block is
// therefore 2 (if we started from 1, the indexer would have processed it
// twice).
assert_eq!(start, 2);

// We create two more blocks, 2, and 3, which are processed by the indexer.
mock_request("/block").await;
mock_request("/block").await;

tokio::time::sleep(std::time::Duration::from_secs(2)).await;

let start = fuel_indexer::get_start_block(&mut conn, &manifest)
.await
.unwrap();

assert_eq!(start, 3);
// The next start block is therefore 4. The indexer should have processed
// blocks 1, 2, and 3.
assert_eq!(start, 4);
}

#[actix_web::test]
Expand All @@ -648,3 +659,63 @@ async fn test_generics() {
"aaaasdfsdfasdfsdfaasdfsdfasdfsdf"
);
}

#[actix_web::test]
async fn test_no_missing_blocks() {
let IndexingTestComponents {
ref db,
ref manifest,
..
} = setup_indexing_test_components(None).await;

let mut conn = fuel_indexer_database::IndexerConnection::Postgres(Box::new(
db.pool.acquire().await.unwrap(),
));

// Allow the indexer to start and process blocks.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

mock_request("/block").await;
mock_request("/block").await;

let start = fuel_indexer_database::queries::last_block_height_for_indexer(
&mut conn,
&manifest.namespace(),
&manifest.identifier(),
)
.await
.unwrap();

assert_eq!(start, 3);

// Remove the last item from indexmetadataentity, simulating missing a block.
let mut conn_2 = db.pool.acquire().await.unwrap();
sqlx::query(
"DELETE FROM fuel_indexer_test_index1.indexmetadataentity WHERE block_height = 3",
)
.execute(&mut conn_2)
.await
.unwrap();

// Trigger more blocks. The indexer will receive blocks 4 and 5. However,
// due to a missing item from indexmetadataentity, the indexer can't
// progress.
mock_request("/block").await;
mock_request("/block").await;

tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// last_block_height_for_indexer fetches MAX(block_height) from
// indexmetadataentity. Thus, if the indexer processed blocks 4 and 5, the
// value would be 5. Since the DB trigger stopped the indexer from
// progressing, and since we've deleted one row, the expected value is 2.
let start = fuel_indexer_database::queries::last_block_height_for_indexer(
&mut conn,
&manifest.namespace(),
&manifest.identifier(),
)
.await
.unwrap();

assert_eq!(start, 2);
}
10 changes: 8 additions & 2 deletions packages/fuel-indexer-tests/tests/web_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ async fn test_replacing_an_indexer_and_keeping_or_removing_data() {
let mut conn =
IndexerConnection::Postgres(Box::new(db.pool.acquire().await.unwrap()));

// Allow the indexer to start and process blocks.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

let last = last_block_height_for_indexer(
&mut conn,
manifest.namespace(),
Expand Down Expand Up @@ -359,6 +362,9 @@ async fn test_replacing_an_indexer_and_keeping_or_removing_data() {
.await
.unwrap();

// Allow the old indexer to shut down and the new indexer to start.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

let last = last_block_height_for_indexer(
&mut conn,
manifest.namespace(),
Expand Down Expand Up @@ -386,8 +392,8 @@ async fn test_replacing_an_indexer_and_keeping_or_removing_data() {
.unwrap();

// We removed the data, and the new indexer sould not start until block 5 is
// present. The last block must be 1.
assert_eq!(last, 1);
// present. The last block must be 0.
assert_eq!(last, 0);

// Increase blocks to 5. The new indexer should start.
mock_request("/block").await;
Expand Down
10 changes: 10 additions & 0 deletions packages/fuel-indexer/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,16 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
}

if let Err(e) = result {
if let IndexerError::RuntimeError(ref e) = e {
if let Some(&WasmIndexerError::MissingBlocksError) =
e.downcast_ref::<WasmIndexerError>()
{
error!(
"Indexer({indexer_uid}) terminating due to missing blocks."
);
break;
}
}
// Run time metering is deterministic. There is no point in retrying.
if let IndexerError::RunTimeLimitExceededError = e {
error!("Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points");
Expand Down
25 changes: 23 additions & 2 deletions packages/fuel-indexer/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ fn put_object(

if let Err(e) = result {
error!("Failed to put_object: {e}");
return Err(WasmIndexerError::DatabaseError);
return Err(database_operation_failure(e));
};

Ok(())
Expand Down Expand Up @@ -285,12 +285,33 @@ fn put_many_to_many_record(

if let Err(e) = result {
error!("Failed to put_many_to_many_record: {e:?}");
return Err(WasmIndexerError::DatabaseError);
return Err(database_operation_failure(e));
}

Ok(())
}

// Returns a specialized error code when the database trigger, which ensures
// indexers can't miss blocks, raises an exception. Otherwise, returns an error
// code indicating a generic database operation failure.
fn database_operation_failure(e: crate::IndexerError) -> WasmIndexerError {
match e {
crate::IndexerError::SqlxError(e) => {
if let Some(e) = e.as_database_error() {
if let Some(e) = e.try_downcast_ref::<sqlx::postgres::PgDatabaseError>() {
if let Some(source) = e.r#where() {
if source.contains("PL/pgSQL function ensure_block_height_consecutive() line 8 at RAISE") {
return WasmIndexerError::MissingBlocksError
}
}
}
}
WasmIndexerError::DatabaseError
}
_ => WasmIndexerError::DatabaseError,
}
}

/// When called from WASM it will terminate the execution and return the error
/// code.
pub fn early_exit(err_code: u32) -> Result<(), WasmIndexerError> {
Expand Down
9 changes: 8 additions & 1 deletion packages/fuel-indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ impl IndexerService {
mut manifest: Manifest,
remove_data: bool,
) -> IndexerResult<()> {
if let Some(killer) = self.killers.get(&manifest.uid()) {
killer.store(true, std::sync::atomic::Ordering::SeqCst);
}

let mut conn = self.pool.acquire().await?;

let indexer_exists = (queries::get_indexer_id(
Expand Down Expand Up @@ -372,7 +376,10 @@ pub async fn get_start_block(
.await?;
let start = manifest.start_block().unwrap_or(last);
let block = if *resumable {
std::cmp::max(start, last)
// If the last processed block is N, we want to resume from N+1.
// A database trigger prevents the indexer from processing the
// same block twice.
std::cmp::max(start, last + 1)
} else {
start
};
Expand Down

0 comments on commit abb8abf

Please sign in to comment.