diff --git a/packages/fuel-indexer-database/postgres/src/lib.rs b/packages/fuel-indexer-database/postgres/src/lib.rs index e02701c68..1fafd6b0b 100644 --- a/packages/fuel-indexer-database/postgres/src/lib.rs +++ b/packages/fuel-indexer-database/postgres/src/lib.rs @@ -633,7 +633,8 @@ pub async fn indexer_assets( }) } -/// Return the last block height that the given indexer has indexed. +/// Return the last block height that the given indexer has indexed. If the +/// indexer indexed no blocks, the result is 0. #[cfg_attr(feature = "metrics", metrics)] pub async fn last_block_height_for_indexer( conn: &mut PoolConnection, @@ -649,7 +650,7 @@ pub async fn last_block_height_for_indexer( Ok(row .try_get::(0) .map(|id| id.to_u32().expect("Bad block height.")) - .unwrap_or_else(|_e| 1)) + .unwrap_or(0)) } // TODO: https://github.com/FuelLabs/fuel-indexer/issues/251 @@ -911,3 +912,37 @@ pub async fn put_many_to_many_record( execute_query(conn, query).await?; Ok(()) } + +pub async fn create_ensure_block_height_consecutive_trigger( + conn: &mut PoolConnection, + namespace: &str, + identifier: &str, +) -> sqlx::Result<()> { + let trigger_function = "CREATE OR REPLACE FUNCTION ensure_block_height_consecutive() + RETURNS TRIGGER AS $$ + DECLARE + block_height integer; + BEGIN + EXECUTE format('SELECT MAX(block_height) FROM %I.%I', TG_TABLE_SCHEMA, TG_TABLE_NAME) INTO block_height; + + IF NEW.block_height IS NOT NULL AND block_height IS NOT NULL AND NEW.block_height != block_height + 1 THEN + RAISE EXCEPTION '%.%: attempted to insert value with block_height = % while last indexed block_height = %. block_height values must be consecutive.', TG_TABLE_SCHEMA, TG_TABLE_NAME, NEW.block_height, block_height; + END IF; + + RETURN NEW; + END; + $$ LANGUAGE plpgsql;".to_string(); + + execute_query(conn, trigger_function).await.unwrap(); + + let trigger = format!( + "CREATE TRIGGER trigger_ensure_block_height_consecutive + BEFORE INSERT OR UPDATE ON {namespace}_{identifier}.indexmetadataentity + FOR EACH ROW + EXECUTE FUNCTION ensure_block_height_consecutive();" + ); + + execute_query(conn, trigger).await?; + + Ok(()) +} diff --git a/packages/fuel-indexer-database/src/lib.rs b/packages/fuel-indexer-database/src/lib.rs index 1774b888a..2a351b2e4 100644 --- a/packages/fuel-indexer-database/src/lib.rs +++ b/packages/fuel-indexer-database/src/lib.rs @@ -37,6 +37,14 @@ pub enum IndexerConnection { Postgres(Box>), } +impl IndexerConnection { + pub fn database_type(&self) -> DbType { + match self { + IndexerConnection::Postgres(_) => DbType::Postgres, + } + } +} + #[derive(Clone, Debug)] pub enum IndexerConnectionPool { Postgres(sqlx::Pool), diff --git a/packages/fuel-indexer-database/src/queries.rs b/packages/fuel-indexer-database/src/queries.rs index 39fae7644..dbbd38d47 100644 --- a/packages/fuel-indexer-database/src/queries.rs +++ b/packages/fuel-indexer-database/src/queries.rs @@ -409,3 +409,18 @@ pub async fn put_many_to_many_record( } } } + +pub async fn create_ensure_block_height_consecutive_trigger( + conn: &mut IndexerConnection, + namespace: &str, + identifier: &str, +) -> sqlx::Result<()> { + match conn { + IndexerConnection::Postgres(ref mut c) => { + postgres::create_ensure_block_height_consecutive_trigger( + c, namespace, identifier, + ) + .await + } + } +} diff --git a/packages/fuel-indexer-lib/src/lib.rs b/packages/fuel-indexer-lib/src/lib.rs index 3eacc8d82..e246c79c0 100644 --- a/packages/fuel-indexer-lib/src/lib.rs +++ b/packages/fuel-indexer-lib/src/lib.rs @@ -45,6 +45,7 @@ pub enum WasmIndexerError { UninitializedMemory, KillSwitch, DatabaseError, + MissingBlocksError, GeneralError, } @@ -56,8 +57,9 @@ impl From for WasmIndexerError { 2 => Self::PutObjectError, 3 => Self::UnableToSaveListType, 4 => Self::UninitializedMemory, - 6 => Self::KillSwitch, - 7 => Self::DatabaseError, + 5 => Self::KillSwitch, + 6 => Self::DatabaseError, + 7 => Self::MissingBlocksError, _ => Self::GeneralError, } } @@ -88,7 +90,10 @@ impl std::fmt::Display for WasmIndexerError { ) } Self::DatabaseError => { - write!(f, "Database operation failed.") + write!(f, "Failed performing a database operation") + } + Self::MissingBlocksError => { + write!(f, "Some blocks are missing") } Self::GeneralError => write!(f, "Some unspecified WASM error occurred."), } diff --git a/packages/fuel-indexer-schema/src/db/tables.rs b/packages/fuel-indexer-schema/src/db/tables.rs index b43cbe186..0417e069e 100644 --- a/packages/fuel-indexer-schema/src/db/tables.rs +++ b/packages/fuel-indexer-schema/src/db/tables.rs @@ -187,6 +187,13 @@ impl IndexerSchema { queries::execute_query(conn, stmnt.to_owned()).await?; } + queries::create_ensure_block_height_consecutive_trigger( + conn, + &self.namespace, + &self.identifier, + ) + .await?; + self.tables = tables; Ok(self) diff --git a/packages/fuel-indexer-tests/tests/indexing.rs b/packages/fuel-indexer-tests/tests/indexing.rs index fdac32699..d4fc97910 100644 --- a/packages/fuel-indexer-tests/tests/indexing.rs +++ b/packages/fuel-indexer-tests/tests/indexing.rs @@ -604,25 +604,36 @@ async fn test_start_block() { ref manifest, .. } = setup_indexing_test_components(None).await; - let mut conn = fuel_indexer_database::IndexerConnection::Postgres(Box::new( db.pool.acquire().await.unwrap(), )); + // Allow the indexer to start and process blocks. + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let start = fuel_indexer::get_start_block(&mut conn, &manifest) .await .unwrap(); - assert_eq!(start, 1); + // setup_indexing_test_components deploys a contract, so one block exists. + // The indexer should have processed that block. The start block is + // therefore 2 (if we started from 1, the indexer would have processed it + // twice). + assert_eq!(start, 2); + // We create two more blocks, 2, and 3, which are processed by the indexer. mock_request("/block").await; mock_request("/block").await; + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let start = fuel_indexer::get_start_block(&mut conn, &manifest) .await .unwrap(); - assert_eq!(start, 3); + // The next start block is therefore 4. The indexer should have processed + // blocks 1, 2, and 3. + assert_eq!(start, 4); } #[actix_web::test] @@ -648,3 +659,63 @@ async fn test_generics() { "aaaasdfsdfasdfsdfaasdfsdfasdfsdf" ); } + +#[actix_web::test] +async fn test_no_missing_blocks() { + let IndexingTestComponents { + ref db, + ref manifest, + .. + } = setup_indexing_test_components(None).await; + + let mut conn = fuel_indexer_database::IndexerConnection::Postgres(Box::new( + db.pool.acquire().await.unwrap(), + )); + + // Allow the indexer to start and process blocks. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + mock_request("/block").await; + mock_request("/block").await; + + let start = fuel_indexer_database::queries::last_block_height_for_indexer( + &mut conn, + &manifest.namespace(), + &manifest.identifier(), + ) + .await + .unwrap(); + + assert_eq!(start, 3); + + // Remove the last item from indexmetadataentity, simulating missing a block. + let mut conn_2 = db.pool.acquire().await.unwrap(); + sqlx::query( + "DELETE FROM fuel_indexer_test_index1.indexmetadataentity WHERE block_height = 3", + ) + .execute(&mut conn_2) + .await + .unwrap(); + + // Trigger more blocks. The indexer will receive blocks 4 and 5. However, + // due to a missing item from indexmetadataentity, the indexer can't + // progress. + mock_request("/block").await; + mock_request("/block").await; + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // last_block_height_for_indexer fetches MAX(block_height) from + // indexmetadataentity. Thus, if the indexer processed blocks 4 and 5, the + // value would be 5. Since the DB trigger stopped the indexer from + // progressing, and since we've deleted one row, the expected value is 2. + let start = fuel_indexer_database::queries::last_block_height_for_indexer( + &mut conn, + &manifest.namespace(), + &manifest.identifier(), + ) + .await + .unwrap(); + + assert_eq!(start, 2); +} diff --git a/packages/fuel-indexer-tests/tests/web_server.rs b/packages/fuel-indexer-tests/tests/web_server.rs index 570654274..a46027f81 100644 --- a/packages/fuel-indexer-tests/tests/web_server.rs +++ b/packages/fuel-indexer-tests/tests/web_server.rs @@ -323,6 +323,9 @@ async fn test_replacing_an_indexer_and_keeping_or_removing_data() { let mut conn = IndexerConnection::Postgres(Box::new(db.pool.acquire().await.unwrap())); + // Allow the indexer to start and process blocks. + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let last = last_block_height_for_indexer( &mut conn, manifest.namespace(), @@ -359,6 +362,9 @@ async fn test_replacing_an_indexer_and_keeping_or_removing_data() { .await .unwrap(); + // Allow the old indexer to shut down and the new indexer to start. + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let last = last_block_height_for_indexer( &mut conn, manifest.namespace(), @@ -386,8 +392,8 @@ async fn test_replacing_an_indexer_and_keeping_or_removing_data() { .unwrap(); // We removed the data, and the new indexer sould not start until block 5 is - // present. The last block must be 1. - assert_eq!(last, 1); + // present. The last block must be 0. + assert_eq!(last, 0); // Increase blocks to 5. The new indexer should start. mock_request("/block").await; diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 381e0e3a9..f7220c946 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -199,6 +199,16 @@ pub fn run_executor( } if let Err(e) = result { + if let IndexerError::RuntimeError(ref e) = e { + if let Some(&WasmIndexerError::MissingBlocksError) = + e.downcast_ref::() + { + error!( + "Indexer({indexer_uid}) terminating due to missing blocks." + ); + break; + } + } // Run time metering is deterministic. There is no point in retrying. if let IndexerError::RunTimeLimitExceededError = e { error!("Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points"); diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index be4b763c7..6cb957563 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -227,7 +227,7 @@ fn put_object( if let Err(e) = result { error!("Failed to put_object: {e}"); - return Err(WasmIndexerError::DatabaseError); + return Err(database_operation_failure(e)); }; Ok(()) @@ -285,12 +285,33 @@ fn put_many_to_many_record( if let Err(e) = result { error!("Failed to put_many_to_many_record: {e:?}"); - return Err(WasmIndexerError::DatabaseError); + return Err(database_operation_failure(e)); } Ok(()) } +// Returns a specialized error code when the database trigger, which ensures +// indexers can't miss blocks, raises an exception. Otherwise, returns an error +// code indicating a generic database operation failure. +fn database_operation_failure(e: crate::IndexerError) -> WasmIndexerError { + match e { + crate::IndexerError::SqlxError(e) => { + if let Some(e) = e.as_database_error() { + if let Some(e) = e.try_downcast_ref::() { + if let Some(source) = e.r#where() { + if source.contains("PL/pgSQL function ensure_block_height_consecutive() line 8 at RAISE") { + return WasmIndexerError::MissingBlocksError + } + } + } + } + WasmIndexerError::DatabaseError + } + _ => WasmIndexerError::DatabaseError, + } +} + /// When called from WASM it will terminate the execution and return the error /// code. pub fn early_exit(err_code: u32) -> Result<(), WasmIndexerError> { diff --git a/packages/fuel-indexer/src/service.rs b/packages/fuel-indexer/src/service.rs index 78c8646db..248b9a10c 100644 --- a/packages/fuel-indexer/src/service.rs +++ b/packages/fuel-indexer/src/service.rs @@ -63,6 +63,10 @@ impl IndexerService { mut manifest: Manifest, remove_data: bool, ) -> IndexerResult<()> { + if let Some(killer) = self.killers.get(&manifest.uid()) { + killer.store(true, std::sync::atomic::Ordering::SeqCst); + } + let mut conn = self.pool.acquire().await?; let indexer_exists = (queries::get_indexer_id( @@ -372,7 +376,10 @@ pub async fn get_start_block( .await?; let start = manifest.start_block().unwrap_or(last); let block = if *resumable { - std::cmp::max(start, last) + // If the last processed block is N, we want to resume from N+1. + // A database trigger prevents the indexer from processing the + // same block twice. + std::cmp::max(start, last + 1) } else { start };