Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

enhancement: add block store #1369

Closed
wants to merge 14 commits into from
Closed
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 34 additions & 1 deletion docs/src/getting-started/indexer-service-infrastructure.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,17 @@ OPTIONS:
--accept-sql-queries
Allow the web server to accept raw SQL queries.

--allow-non-sequential-blocks
Allow missing blocks or non-sequential block processing.

--auth-enabled
Require users to authenticate for some operations.

--auth-strategy <AUTH_STRATEGY>
Authentication scheme used.

--block-page-size <BLOCK_PAGE_SIZE>
Amount of blocks to return in a request to a Fuel node. [default: 10]
Amount of blocks to return in a request to a Fuel node. [default: 20]

-c, --config <FILE>
Indexer service config file.
Expand All @@ -68,6 +71,10 @@ OPTIONS:
--embedded-database
Automatically create and start database using provided options or defaults.

--enable-block-store
Store blocks in the database and use these stored blocks to fast-forward an indexer
starting up.

--fuel-node-host <FUEL_NODE_HOST>
Host of the running Fuel node. [default: localhost]

Expand Down Expand Up @@ -133,6 +140,13 @@ OPTIONS:
--rate-limit-window-size <RATE_LIMIT_WINDOW_SIZE>
Number of seconds over which to allow --rate-limit-rps.

--remove-data
When replacing an indexer, also remove the indexed data.

--remove-stored-blocks
Remove all stored blocks. Use this flag together with --enable-block-store to redownload
block data afresh.

--replace-indexer
Whether to allow replacing an existing indexer. If not specified, an attempt to deploy
over an existing indexer results in an error.
Expand Down Expand Up @@ -163,6 +177,25 @@ OPTIONS:
{{#include ../../../config.yaml}}
```

### Local Block Store

Since version `TODO`, the Fuel indexer service includes the `--enable-block-store` flag, which changes how the service fetches, persists, and processes blocks.

The default behavior is that each indexer fetches its blocks directly from the Fuel node by making repeated HTTP calls. These blocks are not persisted. When a new indexer is deployed (or an indexer is redeployed and its data is removed via `--remove-data` flag), the new indexer starts from scratch.

Setting `--enable-block-store` changes this behavior in a twofold manner:

1. The blocks are fetched by a single process and stored in the database in the `index_block_data` table.
2. The indexers no longer make HTTP calls—they fetch blocks from the database instead.

This means that when a new indexer is deployed, it can be fast-forwarded by using stored blocks. The benchmarks show that using stored blocks is 4-5x faster than fetching blocks from the Fuel node.

The benchmarks also show that 4M blocks from `beta-4.fuel.network` take around `3GB` of space in the database.

This feature is handy for local testing of an indexer. An in-development indexer can be redeployed, and hundreds of thousands of blocks can be processed very quickly.

If, for whatever reason, removing stored blocks is desirable, it can be done with the `--remove-stored-blocks` flag. If enabled, the Fuel indexer service will, when starting up, remove all entries from the `index_block_data` table.

---

## Web API Server
Expand Down
1 change: 1 addition & 0 deletions packages/fuel-indexer-database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ description = "Fuel Indexer Database"
fuel-indexer-database-types = { workspace = true }
fuel-indexer-lib = { workspace = true }
fuel-indexer-postgres = { workspace = true }
fuel-indexer-types = { workspace = true }
sqlx = { version = "0.6" }
thiserror = { workspace = true }
url = "2.2"
1 change: 1 addition & 0 deletions packages/fuel-indexer-database/postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ fuel-indexer-database-types = { workspace = true }
fuel-indexer-lib = { workspace = true }
fuel-indexer-macro-utils = { workspace = true, optional = true }
fuel-indexer-metrics = { workspace = true, optional = true }
fuel-indexer-types = { workspace = true }
sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "postgres", "offline", "time", "chrono", "bigdecimal"] }
tracing = { workspace = true }
uuid = { version = "1.3", features = ["v4"] }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE index_block_data cascade;

DROP FUNCTION ensure_block_height_consecutive;
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE TABLE IF NOT EXISTS index_block_data (
block_height INTEGER PRIMARY KEY NOT NULL,
block_data BYTEA NOT NULL
);

CREATE OR REPLACE FUNCTION ensure_block_height_consecutive()
RETURNS TRIGGER AS $$
DECLARE
block_height integer;
BEGIN
EXECUTE format('SELECT MAX(block_height) FROM %I.%I', TG_TABLE_SCHEMA, TG_TABLE_NAME) INTO block_height;

IF NEW.block_height IS NOT NULL AND block_height IS NOT NULL AND NEW.block_height != block_height + 1 THEN
RAISE EXCEPTION '%.%: attempted to insert value with block_height = % while last indexed block_height = %. block_height values must be consecutive.', TG_TABLE_SCHEMA, TG_TABLE_NAME, NEW.block_height, block_height;
END IF;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trigger_ensure_block_height_consecutive
BEFORE INSERT OR UPDATE ON index_block_data
FOR EACH ROW
EXECUTE FUNCTION ensure_block_height_consecutive();
91 changes: 91 additions & 0 deletions packages/fuel-indexer-database/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
use bigdecimal::ToPrimitive;
use fuel_indexer_database_types::*;
use fuel_indexer_lib::utils::sha256_digest;
use fuel_indexer_types::fuel::BlockData;
use sqlx::QueryBuilder;
use sqlx::{pool::PoolConnection, postgres::PgRow, types::JsonValue, Postgres, Row};
use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -496,6 +498,74 @@ pub async fn register_indexer(
})
}

/// Save `BlockData` to the database.
pub async fn save_block_data(
conn: &mut PoolConnection<Postgres>,
blockdata: &[BlockData],
) -> sqlx::Result<()> {
if blockdata.is_empty() {
return Ok(());
}

let mut qb =
QueryBuilder::new("INSERT INTO index_block_data (block_height, block_data) ");
qb.push_values(blockdata, |mut b, bd| {
let height = bd.height as i32;
let block = fuel_indexer_lib::utils::serialize(bd);
b.push_bind(height);
b.push_bind(block);
});

qb.build().execute(conn).await?;

Ok(())
}

/// Load `BlockData` from the database.
pub async fn load_block_data(
conn: &mut PoolConnection<Postgres>,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> sqlx::Result<Vec<BlockData>> {
let raw = load_raw_block_data(conn, start_block, end_block, limit).await?;

let mut blocks = Vec::new();
for bytes in raw {
let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap();
blocks.push(blockdata);
}
Ok(blocks)
}

/// Load raw `BlockData` bytes from the database.
pub async fn load_raw_block_data(
conn: &mut PoolConnection<Postgres>,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> sqlx::Result<Vec<Vec<u8>>> {
let end_condition = end_block
.map(|x| format!("AND block_height <= {x}"))
.unwrap_or("".to_string());
let query = format!("SELECT block_data FROM index_block_data WHERE block_height >= {start_block} {end_condition} ORDER BY block_height ASC LIMIT {limit}");

let rows = sqlx::query(&query).fetch_all(conn).await?;

let mut blocks = Vec::new();
for row in rows {
let bytes = row.get::<Vec<u8>, usize>(0);
blocks.push(bytes);
}
Ok(blocks)
}

pub async fn remove_block_data(
conn: &mut PoolConnection<Postgres>,
) -> sqlx::Result<usize> {
execute_query(conn, "DELETE FROM index_block_data;".to_string()).await
}

/// Return all indexers registered to this indexer serivce.
#[cfg_attr(feature = "metrics", metrics)]
pub async fn all_registered_indexers(
Expand Down Expand Up @@ -653,6 +723,23 @@ pub async fn last_block_height_for_indexer(
.unwrap_or(0))
}

/// Return the last block height for stored blocks.
#[cfg_attr(feature = "metrics", metrics)]
pub async fn last_block_height_for_stored_blocks(
conn: &mut PoolConnection<Postgres>,
) -> sqlx::Result<u32> {
let query = "SELECT MAX(block_height) FROM index_block_data LIMIT 1".to_string();

let row = sqlx::query(&query).fetch_one(conn).await?;

let result = row
.try_get::<i32, usize>(0)
.map(|id| id.to_u32().expect("Bad block height."))
.unwrap_or(0);

Ok(result)
}

// TODO: https://github.com/FuelLabs/fuel-indexer/issues/251
#[cfg_attr(feature = "metrics", metrics)]
pub async fn asset_already_exists(
Expand Down Expand Up @@ -913,6 +1000,8 @@ pub async fn put_many_to_many_record(
Ok(())
}

/// Create a database trigger on indexmetadataentity table in the indexers
/// schema which ensures that indexed blocks must be consecutive.
pub async fn create_ensure_block_height_consecutive_trigger(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
Expand Down Expand Up @@ -959,6 +1048,8 @@ pub async fn create_ensure_block_height_consecutive_trigger(
Ok(())
}

/// Remove the database trogger which ensures that the indexed blocks must be
/// consecutive.
pub async fn remove_ensure_block_height_consecutive_trigger(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
Expand Down
59 changes: 59 additions & 0 deletions packages/fuel-indexer-database/src/queries.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{types::*, IndexerConnection};
use fuel_indexer_postgres as postgres;
use fuel_indexer_types::fuel::BlockData;
use sqlx::types::{
chrono::{DateTime, Utc},
JsonValue,
Expand Down Expand Up @@ -219,6 +220,53 @@ pub async fn register_indexer(
}
}

/// Save `BlockData` in the database.
pub async fn save_block_data(
conn: &mut IndexerConnection,
blockdata: &[BlockData],
) -> sqlx::Result<()> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::save_block_data(c, blockdata).await
}
}
}

/// Load `BlockData` from the database.
pub async fn load_block_data(
conn: &mut IndexerConnection,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> sqlx::Result<Vec<BlockData>> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::load_block_data(c, start_block, end_block, limit).await
}
}
}

/// Load raw `BlockData` bytes from the database.
pub async fn load_raw_block_data(
conn: &mut IndexerConnection,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> sqlx::Result<Vec<Vec<u8>>> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::load_raw_block_data(c, start_block, end_block, limit).await
}
}
}

/// Remove all stored `BlockData` from the database.
pub async fn remove_block_data(conn: &mut IndexerConnection) -> sqlx::Result<usize> {
match conn {
IndexerConnection::Postgres(ref mut c) => postgres::remove_block_data(c).await,
}
}

/// Return all indexers registered to this indexer serivce.
pub async fn all_registered_indexers(
conn: &mut IndexerConnection,
Expand Down Expand Up @@ -287,6 +335,17 @@ pub async fn last_block_height_for_indexer(
}
}

/// Return the last block height that the given indexer has indexed.
pub async fn last_block_height_for_stored_blocks(
conn: &mut IndexerConnection,
) -> sqlx::Result<u32> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::last_block_height_for_stored_blocks(c).await
}
}
}

pub async fn asset_already_exists(
conn: &mut IndexerConnection,
asset_type: &IndexerAssetType,
Expand Down
14 changes: 14 additions & 0 deletions packages/fuel-indexer-lib/src/config/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,20 @@ pub struct IndexerArgs {
#[clap(long, help = "Amount of blocks to return in a request to a Fuel node.", default_value_t = defaults::NODE_BLOCK_PAGE_SIZE)]
pub block_page_size: usize,

/// Store blocks in the database and use these stored blocks to fast-forward an indexer starting up.
#[clap(
long,
help = "Store blocks in the database and use these stored blocks to fast-forward an indexer starting up."
)]
pub enable_block_store: bool,

/// Remove all stored blocks. Use this flag together with --enable-block-store to redownload block data afresh.
#[clap(
long,
help = "Remove all stored blocks. Use this flag together with --enable-block-store to redownload block data afresh."
)]
pub remove_stored_blocks: bool,

/// Allow missing blocks or non-sequential block processing.
#[clap(
long,
Expand Down
9 changes: 6 additions & 3 deletions packages/fuel-indexer-lib/src/config/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ pub struct FuelClientConfig {
}

impl FuelClientConfig {
pub fn health_check_uri(self) -> Uri {
pub fn uri(&self) -> Uri {
Uri::from(self)
}
pub fn health_check_uri(&self) -> Uri {
let base = Uri::from(self);
format!("{}{}", base, "health")
.parse()
Expand All @@ -34,8 +37,8 @@ impl Env for FuelClientConfig {
}
}

impl From<FuelClientConfig> for Uri {
fn from(config: FuelClientConfig) -> Self {
impl From<&FuelClientConfig> for Uri {
fn from(config: &FuelClientConfig) -> Self {
let uri = derive_http_url(&config.host, &config.port);
uri.parse().unwrap_or_else(|e| {
panic!("Cannot parse HTTP URI from Fuel node config {config:?}: {e}")
Expand Down
Loading