diff --git a/Cargo.lock b/Cargo.lock index 115a1dd3d..b02e049c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3298,6 +3298,7 @@ dependencies = [ "fuel-indexer-database-types", "fuel-indexer-lib", "fuel-indexer-postgres", + "fuel-indexer-types", "sqlx", "thiserror", "url", @@ -3434,6 +3435,7 @@ dependencies = [ "fuel-indexer-lib", "fuel-indexer-macro-utils", "fuel-indexer-metrics", + "fuel-indexer-types", "sqlx", "tracing", "uuid 1.4.1", diff --git a/docs/src/getting-started/indexer-service-infrastructure.md b/docs/src/getting-started/indexer-service-infrastructure.md index 13a5eda7d..062d00350 100644 --- a/docs/src/getting-started/indexer-service-infrastructure.md +++ b/docs/src/getting-started/indexer-service-infrastructure.md @@ -50,6 +50,9 @@ OPTIONS: --accept-sql-queries Allow the web server to accept raw SQL queries. + --allow-non-sequential-blocks + Allow missing blocks or non-sequential block processing. + --auth-enabled Require users to authenticate for some operations. @@ -57,7 +60,7 @@ OPTIONS: Authentication scheme used. --block-page-size - Amount of blocks to return in a request to a Fuel node. [default: 10] + Amount of blocks to return in a request to a Fuel node. [default: 20] -c, --config Indexer service config file. @@ -68,6 +71,10 @@ OPTIONS: --embedded-database Automatically create and start database using provided options or defaults. + --enable-block-store + Store blocks in the database and use these stored blocks to fast-forward an indexer + starting up. + --fuel-node-host Host of the running Fuel node. [default: localhost] @@ -133,6 +140,13 @@ OPTIONS: --rate-limit-window-size Number of seconds over which to allow --rate-limit-rps. + --remove-data + When replacing an indexer, also remove the indexed data. + + --remove-stored-blocks + Remove all stored blocks. Use this flag together with --enable-block-store to redownload + block data afresh. + --replace-indexer Whether to allow replacing an existing indexer. If not specified, an attempt to deploy over an existing indexer results in an error. @@ -163,6 +177,25 @@ OPTIONS: {{#include ../../../config.yaml}} ``` +### Local Block Store + +Since version `TODO`, the Fuel indexer service includes the `--enable-block-store` flag, which changes how the service fetches, persists, and processes blocks. + +The default behavior is that each indexer fetches its blocks directly from the Fuel node by making repeated HTTP calls. These blocks are not persisted. When a new indexer is deployed (or an indexer is redeployed and its data is removed via `--remove-data` flag), the new indexer starts from scratch. + +Setting `--enable-block-store` changes this behavior in a twofold manner: + +1. The blocks are fetched by a single process and stored in the database in the `index_block_data` table. +2. The indexers no longer make HTTP calls—they fetch blocks from the database instead. + +This means that when a new indexer is deployed, it can be fast-forwarded by using stored blocks. The benchmarks show that using stored blocks is 4-5x faster than fetching blocks from the Fuel node. + +The benchmarks also show that 4M blocks from `beta-4.fuel.network` take around `3GB` of space in the database. + +This feature is handy for local testing of an indexer. An in-development indexer can be redeployed, and hundreds of thousands of blocks can be processed very quickly. + +If, for whatever reason, removing stored blocks is desirable, it can be done with the `--remove-stored-blocks` flag. If enabled, the Fuel indexer service will, when starting up, remove all entries from the `index_block_data` table. + --- ## Web API Server diff --git a/packages/fuel-indexer-database/Cargo.toml b/packages/fuel-indexer-database/Cargo.toml index 73cc34ca8..6c2ab4b6f 100644 --- a/packages/fuel-indexer-database/Cargo.toml +++ b/packages/fuel-indexer-database/Cargo.toml @@ -13,6 +13,7 @@ description = "Fuel Indexer Database" fuel-indexer-database-types = { workspace = true } fuel-indexer-lib = { workspace = true } fuel-indexer-postgres = { workspace = true } +fuel-indexer-types = { workspace = true } sqlx = { version = "0.6" } thiserror = { workspace = true } url = "2.2" diff --git a/packages/fuel-indexer-database/postgres/Cargo.toml b/packages/fuel-indexer-database/postgres/Cargo.toml index cad581022..fe22fbf07 100644 --- a/packages/fuel-indexer-database/postgres/Cargo.toml +++ b/packages/fuel-indexer-database/postgres/Cargo.toml @@ -16,6 +16,7 @@ fuel-indexer-database-types = { workspace = true } fuel-indexer-lib = { workspace = true } fuel-indexer-macro-utils = { workspace = true, optional = true } fuel-indexer-metrics = { workspace = true, optional = true } +fuel-indexer-types = { workspace = true } sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "postgres", "offline", "time", "chrono", "bigdecimal"] } tracing = { workspace = true } uuid = { version = "1.3", features = ["v4"] } diff --git a/packages/fuel-indexer-database/postgres/migrations/20230824180932_block_data_and_block_count.down.sql b/packages/fuel-indexer-database/postgres/migrations/20230824180932_block_data_and_block_count.down.sql new file mode 100644 index 000000000..d150728d4 --- /dev/null +++ b/packages/fuel-indexer-database/postgres/migrations/20230824180932_block_data_and_block_count.down.sql @@ -0,0 +1,3 @@ +DROP TABLE index_block_data cascade; + +DROP FUNCTION ensure_block_height_consecutive; \ No newline at end of file diff --git a/packages/fuel-indexer-database/postgres/migrations/20230824180932_block_data_and_block_count.up.sql b/packages/fuel-indexer-database/postgres/migrations/20230824180932_block_data_and_block_count.up.sql new file mode 100644 index 000000000..a6e8c8c4b --- /dev/null +++ b/packages/fuel-indexer-database/postgres/migrations/20230824180932_block_data_and_block_count.up.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS index_block_data ( + block_height INTEGER PRIMARY KEY NOT NULL, + block_data BYTEA NOT NULL +); + +CREATE OR REPLACE FUNCTION ensure_block_height_consecutive() +RETURNS TRIGGER AS $$ +DECLARE + block_height integer; +BEGIN + EXECUTE format('SELECT MAX(block_height) FROM %I.%I', TG_TABLE_SCHEMA, TG_TABLE_NAME) INTO block_height; + + IF NEW.block_height IS NOT NULL AND block_height IS NOT NULL AND NEW.block_height != block_height + 1 THEN + RAISE EXCEPTION '%.%: attempted to insert value with block_height = % while last indexed block_height = %. block_height values must be consecutive.', TG_TABLE_SCHEMA, TG_TABLE_NAME, NEW.block_height, block_height; + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER trigger_ensure_block_height_consecutive +BEFORE INSERT OR UPDATE ON index_block_data +FOR EACH ROW +EXECUTE FUNCTION ensure_block_height_consecutive(); \ No newline at end of file diff --git a/packages/fuel-indexer-database/postgres/src/lib.rs b/packages/fuel-indexer-database/postgres/src/lib.rs index d49ab269b..41bce5b39 100644 --- a/packages/fuel-indexer-database/postgres/src/lib.rs +++ b/packages/fuel-indexer-database/postgres/src/lib.rs @@ -3,6 +3,8 @@ use bigdecimal::ToPrimitive; use fuel_indexer_database_types::*; use fuel_indexer_lib::utils::sha256_digest; +use fuel_indexer_types::fuel::BlockData; +use sqlx::QueryBuilder; use sqlx::{pool::PoolConnection, postgres::PgRow, types::JsonValue, Postgres, Row}; use std::str::FromStr; use std::time::{SystemTime, UNIX_EPOCH}; @@ -496,6 +498,74 @@ pub async fn register_indexer( }) } +/// Save `BlockData` to the database. +pub async fn save_block_data( + conn: &mut PoolConnection, + blockdata: &[BlockData], +) -> sqlx::Result<()> { + if blockdata.is_empty() { + return Ok(()); + } + + let mut qb = + QueryBuilder::new("INSERT INTO index_block_data (block_height, block_data) "); + qb.push_values(blockdata, |mut b, bd| { + let height = bd.height as i32; + let block = fuel_indexer_lib::utils::serialize(bd); + b.push_bind(height); + b.push_bind(block); + }); + + qb.build().execute(conn).await?; + + Ok(()) +} + +/// Load `BlockData` from the database. +pub async fn load_block_data( + conn: &mut PoolConnection, + start_block: u32, + end_block: Option, + limit: usize, +) -> sqlx::Result> { + let raw = load_raw_block_data(conn, start_block, end_block, limit).await?; + + let mut blocks = Vec::new(); + for bytes in raw { + let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap(); + blocks.push(blockdata); + } + Ok(blocks) +} + +/// Load raw `BlockData` bytes from the database. +pub async fn load_raw_block_data( + conn: &mut PoolConnection, + start_block: u32, + end_block: Option, + limit: usize, +) -> sqlx::Result>> { + let end_condition = end_block + .map(|x| format!("AND block_height <= {x}")) + .unwrap_or("".to_string()); + let query = format!("SELECT block_data FROM index_block_data WHERE block_height >= {start_block} {end_condition} ORDER BY block_height ASC LIMIT {limit}"); + + let rows = sqlx::query(&query).fetch_all(conn).await?; + + let mut blocks = Vec::new(); + for row in rows { + let bytes = row.get::, usize>(0); + blocks.push(bytes); + } + Ok(blocks) +} + +pub async fn remove_block_data( + conn: &mut PoolConnection, +) -> sqlx::Result { + execute_query(conn, "DELETE FROM index_block_data;".to_string()).await +} + /// Return all indexers registered to this indexer serivce. #[cfg_attr(feature = "metrics", metrics)] pub async fn all_registered_indexers( @@ -653,6 +723,23 @@ pub async fn last_block_height_for_indexer( .unwrap_or(0)) } +/// Return the last block height for stored blocks. +#[cfg_attr(feature = "metrics", metrics)] +pub async fn last_block_height_for_stored_blocks( + conn: &mut PoolConnection, +) -> sqlx::Result { + let query = "SELECT MAX(block_height) FROM index_block_data LIMIT 1".to_string(); + + let row = sqlx::query(&query).fetch_one(conn).await?; + + let result = row + .try_get::(0) + .map(|id| id.to_u32().expect("Bad block height.")) + .unwrap_or(0); + + Ok(result) +} + // TODO: https://github.com/FuelLabs/fuel-indexer/issues/251 #[cfg_attr(feature = "metrics", metrics)] pub async fn asset_already_exists( @@ -913,6 +1000,8 @@ pub async fn put_many_to_many_record( Ok(()) } +/// Create a database trigger on indexmetadataentity table in the indexers +/// schema which ensures that indexed blocks must be consecutive. pub async fn create_ensure_block_height_consecutive_trigger( conn: &mut PoolConnection, namespace: &str, @@ -959,6 +1048,8 @@ pub async fn create_ensure_block_height_consecutive_trigger( Ok(()) } +/// Remove the database trogger which ensures that the indexed blocks must be +/// consecutive. pub async fn remove_ensure_block_height_consecutive_trigger( conn: &mut PoolConnection, namespace: &str, diff --git a/packages/fuel-indexer-database/src/queries.rs b/packages/fuel-indexer-database/src/queries.rs index 590c15fb0..efed1a3a9 100644 --- a/packages/fuel-indexer-database/src/queries.rs +++ b/packages/fuel-indexer-database/src/queries.rs @@ -1,5 +1,6 @@ use crate::{types::*, IndexerConnection}; use fuel_indexer_postgres as postgres; +use fuel_indexer_types::fuel::BlockData; use sqlx::types::{ chrono::{DateTime, Utc}, JsonValue, @@ -219,6 +220,53 @@ pub async fn register_indexer( } } +/// Save `BlockData` in the database. +pub async fn save_block_data( + conn: &mut IndexerConnection, + blockdata: &[BlockData], +) -> sqlx::Result<()> { + match conn { + IndexerConnection::Postgres(ref mut c) => { + postgres::save_block_data(c, blockdata).await + } + } +} + +/// Load `BlockData` from the database. +pub async fn load_block_data( + conn: &mut IndexerConnection, + start_block: u32, + end_block: Option, + limit: usize, +) -> sqlx::Result> { + match conn { + IndexerConnection::Postgres(ref mut c) => { + postgres::load_block_data(c, start_block, end_block, limit).await + } + } +} + +/// Load raw `BlockData` bytes from the database. +pub async fn load_raw_block_data( + conn: &mut IndexerConnection, + start_block: u32, + end_block: Option, + limit: usize, +) -> sqlx::Result>> { + match conn { + IndexerConnection::Postgres(ref mut c) => { + postgres::load_raw_block_data(c, start_block, end_block, limit).await + } + } +} + +/// Remove all stored `BlockData` from the database. +pub async fn remove_block_data(conn: &mut IndexerConnection) -> sqlx::Result { + match conn { + IndexerConnection::Postgres(ref mut c) => postgres::remove_block_data(c).await, + } +} + /// Return all indexers registered to this indexer serivce. pub async fn all_registered_indexers( conn: &mut IndexerConnection, @@ -287,6 +335,17 @@ pub async fn last_block_height_for_indexer( } } +/// Return the last block height that the given indexer has indexed. +pub async fn last_block_height_for_stored_blocks( + conn: &mut IndexerConnection, +) -> sqlx::Result { + match conn { + IndexerConnection::Postgres(ref mut c) => { + postgres::last_block_height_for_stored_blocks(c).await + } + } +} + pub async fn asset_already_exists( conn: &mut IndexerConnection, asset_type: &IndexerAssetType, diff --git a/packages/fuel-indexer-lib/src/config/cli.rs b/packages/fuel-indexer-lib/src/config/cli.rs index 731e6488f..e754fa252 100644 --- a/packages/fuel-indexer-lib/src/config/cli.rs +++ b/packages/fuel-indexer-lib/src/config/cli.rs @@ -192,6 +192,20 @@ pub struct IndexerArgs { #[clap(long, help = "Amount of blocks to return in a request to a Fuel node.", default_value_t = defaults::NODE_BLOCK_PAGE_SIZE)] pub block_page_size: usize, + /// Store blocks in the database and use these stored blocks to fast-forward an indexer starting up. + #[clap( + long, + help = "Store blocks in the database and use these stored blocks to fast-forward an indexer starting up." + )] + pub enable_block_store: bool, + + /// Remove all stored blocks. Use this flag together with --enable-block-store to redownload block data afresh. + #[clap( + long, + help = "Remove all stored blocks. Use this flag together with --enable-block-store to redownload block data afresh." + )] + pub remove_stored_blocks: bool, + /// Allow missing blocks or non-sequential block processing. #[clap( long, diff --git a/packages/fuel-indexer-lib/src/config/client.rs b/packages/fuel-indexer-lib/src/config/client.rs index 92d600688..716b52e13 100644 --- a/packages/fuel-indexer-lib/src/config/client.rs +++ b/packages/fuel-indexer-lib/src/config/client.rs @@ -20,7 +20,10 @@ pub struct FuelClientConfig { } impl FuelClientConfig { - pub fn health_check_uri(self) -> Uri { + pub fn uri(&self) -> Uri { + Uri::from(self) + } + pub fn health_check_uri(&self) -> Uri { let base = Uri::from(self); format!("{}{}", base, "health") .parse() @@ -34,8 +37,8 @@ impl Env for FuelClientConfig { } } -impl From for Uri { - fn from(config: FuelClientConfig) -> Self { +impl From<&FuelClientConfig> for Uri { + fn from(config: &FuelClientConfig) -> Self { let uri = derive_http_url(&config.host, &config.port); uri.parse().unwrap_or_else(|e| { panic!("Cannot parse HTTP URI from Fuel node config {config:?}: {e}") diff --git a/packages/fuel-indexer-lib/src/config/mod.rs b/packages/fuel-indexer-lib/src/config/mod.rs index fb3cf3874..637f9a7aa 100644 --- a/packages/fuel-indexer-lib/src/config/mod.rs +++ b/packages/fuel-indexer-lib/src/config/mod.rs @@ -107,6 +107,8 @@ impl Default for IndexerArgs { remove_data: defaults::REMOVE_DATA, accept_sql_queries: defaults::ACCEPT_SQL, block_page_size: defaults::NODE_BLOCK_PAGE_SIZE, + enable_block_store: defaults::ENABLE_BLOCK_STORE, + remove_stored_blocks: defaults::REMOVE_STORED_BLOCKS, allow_non_sequential_blocks: defaults::ALLOW_NON_SEQUENTIAL_BLOCKS, } } @@ -137,6 +139,8 @@ pub struct IndexerConfig { pub replace_indexer: bool, pub accept_sql_queries: bool, pub block_page_size: usize, + pub enable_block_store: bool, + pub remove_stored_blocks: bool, pub allow_non_sequential_blocks: bool, } @@ -159,6 +163,8 @@ impl Default for IndexerConfig { replace_indexer: defaults::REPLACE_INDEXER, accept_sql_queries: defaults::ACCEPT_SQL, block_page_size: defaults::NODE_BLOCK_PAGE_SIZE, + enable_block_store: defaults::ENABLE_BLOCK_STORE, + remove_stored_blocks: defaults::REMOVE_STORED_BLOCKS, allow_non_sequential_blocks: defaults::ALLOW_NON_SEQUENTIAL_BLOCKS, } } @@ -241,6 +247,8 @@ impl From for IndexerConfig { replace_indexer: args.replace_indexer, accept_sql_queries: args.accept_sql_queries, block_page_size: args.block_page_size, + enable_block_store: args.enable_block_store, + remove_stored_blocks: args.remove_stored_blocks, allow_non_sequential_blocks: args.allow_non_sequential_blocks, }; @@ -329,6 +337,8 @@ impl From for IndexerConfig { replace_indexer: defaults::REPLACE_INDEXER, accept_sql_queries: args.accept_sql_queries, block_page_size: defaults::NODE_BLOCK_PAGE_SIZE, + enable_block_store: defaults::ENABLE_BLOCK_STORE, + remove_stored_blocks: defaults::REMOVE_STORED_BLOCKS, allow_non_sequential_blocks: defaults::ALLOW_NON_SEQUENTIAL_BLOCKS, }; diff --git a/packages/fuel-indexer-lib/src/defaults.rs b/packages/fuel-indexer-lib/src/defaults.rs index fa14ebef0..af7455bc7 100644 --- a/packages/fuel-indexer-lib/src/defaults.rs +++ b/packages/fuel-indexer-lib/src/defaults.rs @@ -135,5 +135,11 @@ pub const REMOVE_DATA: bool = false; /// Allow the web server to accept raw SQL queries. pub const ACCEPT_SQL: bool = false; +/// Store blocks in the database and use these stored blocks to fast-forward an indexer starting up. +pub const ENABLE_BLOCK_STORE: bool = false; + +/// Remove all stored blocks from the database. +pub const REMOVE_STORED_BLOCKS: bool = false; + /// Allow missing blocks or non-sequential block processing. pub const ALLOW_NON_SEQUENTIAL_BLOCKS: bool = false; diff --git a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap index 83519cde3..cc1e42023 100644 --- a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap +++ b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap @@ -39,5 +39,7 @@ rate_limit: replace_indexer: false accept_sql_queries: false block_page_size: 20 +enable_block_store: false +remove_stored_blocks: false allow_non_sequential_blocks: false diff --git a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__forc_index_start_help_output.snap b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__forc_index_start_help_output.snap index da18a60e1..a331a2e41 100644 --- a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__forc_index_start_help_output.snap +++ b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__forc_index_start_help_output.snap @@ -32,6 +32,10 @@ OPTIONS: --embedded-database Automatically create and start database using provided options or defaults. + --enable-block-store + Store blocks in the database and use these stored blocks to fast-forward an indexer + starting up. + --fuel-node-host Host of the running Fuel node. [default: localhost] @@ -100,6 +104,10 @@ OPTIONS: --remove-data When replacing an indexer, also remove the indexed data. + --remove-stored-blocks + Remove all stored blocks. Use this flag together with --enable-block-store to redownload + block data afresh. + --replace-indexer Whether to allow replacing an existing indexer. If not specified, an attempt to deploy over an existing indexer results in an error. diff --git a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_run_help_output.snap b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_run_help_output.snap index 80255a0ec..611e1c5cf 100644 --- a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_run_help_output.snap +++ b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__fuel_indexer_run_help_output.snap @@ -32,6 +32,10 @@ OPTIONS: --embedded-database Automatically create and start database using provided options or defaults. + --enable-block-store + Store blocks in the database and use these stored blocks to fast-forward an indexer + starting up. + --fuel-node-host Host of the running Fuel node. [default: localhost] @@ -100,6 +104,10 @@ OPTIONS: --remove-data When replacing an indexer, also remove the indexed data. + --remove-stored-blocks + Remove all stored blocks. Use this flag together with --enable-block-store to redownload + block data afresh. + --replace-indexer Whether to allow replacing an existing indexer. If not specified, an attempt to deploy over an existing indexer results in an error. diff --git a/packages/fuel-indexer/src/commands/run.rs b/packages/fuel-indexer/src/commands/run.rs index e8b49c202..a7c5b2c13 100644 --- a/packages/fuel-indexer/src/commands/run.rs +++ b/packages/fuel-indexer/src/commands/run.rs @@ -101,16 +101,32 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { info!("Configuration: {:?}", config); + let pool = IndexerConnectionPool::connect(&config.database.to_string()).await?; + + if config.remove_stored_blocks { + info!("Removing stored blocks."); + let mut conn = pool.acquire().await?; + let count = queries::remove_block_data(&mut conn).await?; + info!("Successfully removed {count} blocks."); + } + #[allow(unused)] let (tx, rx) = channel::(defaults::SERVICE_REQUEST_CHANNEL_SIZE); - let pool = IndexerConnectionPool::connect(&config.database.to_string()).await?; - if config.run_migrations { let mut c = pool.acquire().await?; queries::run_migration(&mut c).await?; } + // Block Sync must be started after migrations to ensure that the database + // table has been created. + if config.enable_block_store { + subsystems.spawn(crate::service::create_block_sync_task( + config.clone(), + pool.clone(), + )); + }; + let mut service = IndexerService::new(config.clone(), pool.clone(), rx).await?; match manifest.map(|p| { @@ -140,6 +156,7 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { #[cfg(feature = "api-server")] subsystems.spawn({ let config = config.clone(); + let pool = pool.clone(); async { if let Err(e) = WebApi::build_and_run(config, pool, tx).await { tracing::error!("Api Server failed: {e}"); diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 70a64f01d..3b5d3f669 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -92,16 +92,6 @@ pub fn run_executor( .map(|x| x.to_string()) .unwrap_or(config.fuel_node.to_string()); - // Where should we initially start when fetching blocks from the client? - let mut cursor = executor.manifest().start_block().map(|x| { - if x > 1 { - let decremented = x - 1; - decremented.to_string() - } else { - "0".to_string() - } - }); - info!("Indexer({indexer_uid}) subscribing to Fuel node at {fuel_node_addr}"); let client = FuelClient::from_str(&fuel_node_addr).unwrap_or_else(|e| { @@ -114,6 +104,7 @@ pub fn run_executor( warn!("No end_block specified in the manifest. Indexer({indexer_uid}) will run forever."); } + let enable_block_store = config.enable_block_store; let allow_non_sequential_blocks = config.allow_non_sequential_blocks; async move { @@ -160,6 +151,15 @@ pub fn run_executor( // Keep track of how many empty pages we've received from the client. let mut num_empty_block_reqs = 0; + let mut start_block = crate::get_start_block(&mut conn, executor.manifest()) + .await + .unwrap_or_else(|_| { + panic!("Indexer({indexer_uid}) was unable to determine the start block") + }); + + // Where should we initially start when fetching blocks from the client? + let mut cursor = Some((start_block - 1).to_string()); + loop { // If something else has signaled that this indexer should stop, then stop. if executor.kill_switch().load(Ordering::SeqCst) { @@ -168,13 +168,28 @@ pub fn run_executor( } // Fetch the next page of blocks, and the starting cursor for the subsequent page - let (block_info, next_cursor, _has_next_page) = + let (block_info, next_cursor, _has_next_page) = if enable_block_store { + let result = fuel_indexer_database::queries::load_block_data( + &mut conn, + start_block, + executor.manifest().end_block(), + block_page_size, + ) + .await; + match result { + Ok(blocks) => (blocks, None, false), + Err(e) => { + error!("Indexer({indexer_uid}) failed to load blocks from the database: {e:?}",); + continue; + } + } + } else { match retrieve_blocks_from_node( &client, block_page_size, &cursor, end_block, - &indexer_uid, + &format!("Indexer({indexer_uid})"), ) .await { @@ -194,7 +209,8 @@ pub fn run_executor( continue; } } - }; + } + }; // If our block page request from the client returns empty, we sleep for a bit, and then continue. if block_info.is_empty() { @@ -214,6 +230,8 @@ pub fn run_executor( continue; } + let last_block_height = block_info.last().map(|x| x.height); + // The client responded with actual blocks, so attempt to index them. let result = executor.handle_events(block_info).await; @@ -274,6 +292,10 @@ pub fn run_executor( // If we make it this far, we always go to the next page. cursor = next_cursor; + // What cursor does for retrieving blocks from the Fuel Node, + // start_block does for retrieving blocks from the database + start_block = last_block_height.map(|x| x + 1).unwrap_or(start_block); + // Again, check if something else has signaled that this indexer should stop, then stop. if executor.kill_switch().load(Ordering::SeqCst) { info!("Kill switch flipped, stopping Indexer({indexer_uid}). <('.')>"); @@ -298,7 +320,7 @@ pub async fn retrieve_blocks_from_node( block_page_size: usize, cursor: &Option, end_block: Option, - indexer_uid: &str, + task_id: &str, ) -> IndexerResult<(Vec, Option, bool)> { // Let's check if we need less blocks than block_page_size. let page_size = if let (Some(start), Some(end)) = (cursor, end_block) { @@ -330,7 +352,7 @@ pub async fn retrieve_blocks_from_node( }) .await .unwrap_or_else(|e| { - error!("Indexer({indexer_uid}) failed to retrieve blocks: {e:?}"); + error!("{task_id}: failed to retrieve blocks: {e:?}"); // Setting an empty cursor will cause the indexer to sleep for a bit and try again. PaginatedResult { cursor: None, diff --git a/packages/fuel-indexer/src/service.rs b/packages/fuel-indexer/src/service.rs index 16891fafd..274788fe5 100644 --- a/packages/fuel-indexer/src/service.rs +++ b/packages/fuel-indexer/src/service.rs @@ -4,6 +4,7 @@ use crate::{ }; use async_std::sync::{Arc, Mutex}; use async_std::{fs::File, io::ReadExt}; +use fuel_core_client::client::FuelClient; use fuel_indexer_database::{ queries, types::IndexerAssetType, IndexerConnection, IndexerConnectionPool, }; @@ -399,3 +400,126 @@ pub async fn get_start_block( } } } + +/// Create a tokio task for retrieving blocks from Fuel Node and saving them in +/// the database. +pub(crate) async fn create_block_sync_task( + config: IndexerConfig, + pool: IndexerConnectionPool, +) { + let task_id = "Block Sync"; + + let mut conn = pool.acquire().await.unwrap(); + + let client = + fuel_core_client::client::FuelClient::new(config.fuel_node.uri().to_string()) + .unwrap_or_else(|e| panic!("Client node connection failed: {e}.")); + + check_stored_block_data(pool.clone(), task_id, &config, &client) + .await + .unwrap_or_else(|_| panic!("{task_id} stored blocks verification failed.")); + + let last_height = queries::last_block_height_for_stored_blocks(&mut conn) + .await + .unwrap_or_else(|_| panic!("{task_id} was unable to determine the last block height for stored blocks.")); + + let mut cursor = Some(last_height.to_string()); + + info!("{task_id}: starting from Block#{}", last_height + 1); + + loop { + // Get the next page of blocks, and the starting cursor for the subsequent page + let (block_info, next_cursor, _has_next_page) = + match crate::executor::retrieve_blocks_from_node( + &client, + config.block_page_size, + &cursor, + None, + task_id, + ) + .await + { + Ok((block_info, next_cursor, _has_next_page)) => { + if !block_info.is_empty() { + let first = block_info[0].height; + let last = block_info.last().unwrap().height; + info!("{task_id}: retrieved blocks {}-{}.", first, last); + } + (block_info, next_cursor, _has_next_page) + } + Err(e) => { + error!("{task_id}: failed to retrieve blocks: {e:?}"); + tokio::time::sleep(std::time::Duration::from_secs( + fuel_indexer_lib::defaults::DELAY_FOR_SERVICE_ERROR, + )) + .await; + continue; + } + }; + + if block_info.is_empty() { + info!("{task_id}: no new blocks to process, sleeping zzZZ."); + tokio::time::sleep(std::time::Duration::from_secs( + fuel_indexer_lib::defaults::IDLE_SERVICE_WAIT_SECS, + )) + .await; + } else { + // Blocks must be in order, and there can be no missing blocks. This + // is enforced when saving to the database by a trigger. If + // `save_blockdata` succeeds, all is well. + fuel_indexer_database::queries::save_block_data(&mut conn, &block_info) + .await + .unwrap_or_else(|_| panic!("{task_id} was unable to save block data.")); + + cursor = next_cursor; + } + } +} + +// We store serialized `BlockData` in the database. Since it is not versioned, +// we need a mechanism to detect whether the format of `BlockData` has changed. +// This function fetches some blocks from the client, serializes them, and then +// compares to those stored in the database. If they are not the same, the +// blocks in the database are purged. +async fn check_stored_block_data( + pool: IndexerConnectionPool, + task_id: &str, + config: &IndexerConfig, + client: &FuelClient, +) -> IndexerResult<()> { + let (block_data_client, _, _) = crate::executor::retrieve_blocks_from_node( + client, + config.block_page_size, + &Some("0".to_string()), + None, + task_id, + ) + .await?; + + let block_data_client: Vec> = block_data_client + .iter() + .map(fuel_indexer_lib::utils::serialize) + .collect(); + + let mut conn = pool.acquire().await?; + + let block_data_database = queries::load_raw_block_data( + &mut conn, + 1, + Some(config.block_page_size as u32), + config.block_page_size, + ) + .await?; + + if block_data_database.is_empty() { + return Ok(()); + } + + if block_data_client != block_data_database { + warn!("{task_id} detected serialization format change. Removing stored blocks. {task_id} will re-sync blocks from the client."); + let count = queries::remove_block_data(&mut conn).await?; + warn!("{task_id} successfully removed {count} blocks."); + } + + Ok(()) +} diff --git a/plugins/forc-index/src/ops/forc_index_start.rs b/plugins/forc-index/src/ops/forc_index_start.rs index a94371b92..b82f29280 100644 --- a/plugins/forc-index/src/ops/forc_index_start.rs +++ b/plugins/forc-index/src/ops/forc_index_start.rs @@ -39,6 +39,8 @@ pub async fn init(command: StartCommand) -> anyhow::Result<()> { remove_data, accept_sql_queries, block_page_size, + enable_block_store, + remove_stored_blocks, allow_non_sequential_blocks, } = command; @@ -92,6 +94,8 @@ pub async fn init(command: StartCommand) -> anyhow::Result<()> { ("--auth-enabled", auth_enabled), ("--verbose", verbose), ("--local-fuel-node", local_fuel_node), + ("--enable-block-store", enable_block_store), + ("--remove-stored-blocks", remove_stored_blocks), ("--allow-non-sequential-blocks", allow_non_sequential_blocks), ]; for (opt, value) in options.iter() {