Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

enhancement: add block store #1369

Closed
wants to merge 14 commits into from
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/fuel-indexer-database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ description = "Fuel Indexer Database"
fuel-indexer-database-types = { workspace = true }
fuel-indexer-lib = { workspace = true }
fuel-indexer-postgres = { workspace = true }
fuel-indexer-types = { workspace = true }
sqlx = { version = "0.6" }
thiserror = { workspace = true }
url = "2.2"
1 change: 1 addition & 0 deletions packages/fuel-indexer-database/postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ fuel-indexer-database-types = { workspace = true }
fuel-indexer-lib = { workspace = true }
fuel-indexer-macro-utils = { workspace = true, optional = true }
fuel-indexer-metrics = { workspace = true, optional = true }
fuel-indexer-types = { workspace = true }
sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "postgres", "offline", "time", "chrono", "bigdecimal"] }
tracing = { workspace = true }
uuid = { version = "1.3", features = ["v4"] }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE index_block_data cascade;

DROP FUNCTION ensure_block_height_consecutive;
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE TABLE IF NOT EXISTS index_block_data (
block_height INTEGER PRIMARY KEY NOT NULL,
block_data BYTEA NOT NULL
);

CREATE OR REPLACE FUNCTION ensure_block_height_consecutive()
RETURNS TRIGGER AS $$
DECLARE
block_height integer;
BEGIN
EXECUTE format('SELECT MAX(block_height) FROM %I.%I', TG_TABLE_SCHEMA, TG_TABLE_NAME) INTO block_height;

IF NEW.block_height IS NOT NULL AND block_height IS NOT NULL AND NEW.block_height != block_height + 1 THEN
RAISE EXCEPTION '%.%: attempted to insert value with block_height = % while last indexed block_height = %. block_height values must be consecutive.', TG_TABLE_SCHEMA, TG_TABLE_NAME, NEW.block_height, block_height;
END IF;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trigger_ensure_block_height_consecutive
BEFORE INSERT OR UPDATE ON index_block_data
FOR EACH ROW
EXECUTE FUNCTION ensure_block_height_consecutive();
69 changes: 69 additions & 0 deletions packages/fuel-indexer-database/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
use bigdecimal::ToPrimitive;
use fuel_indexer_database_types::*;
use fuel_indexer_lib::utils::sha256_digest;
use fuel_indexer_types::fuel::BlockData;
use sqlx::QueryBuilder;
use sqlx::{pool::PoolConnection, postgres::PgRow, types::JsonValue, Postgres, Row};
use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -496,6 +498,56 @@ pub async fn register_indexer(
})
}

pub async fn save_block_data(
conn: &mut PoolConnection<Postgres>,
blockdata: &[BlockData],
) -> sqlx::Result<()> {
if blockdata.is_empty() {
return Ok(());
}

let mut qb =
QueryBuilder::new("INSERT INTO index_block_data (block_height, block_data) ");
qb.push_values(blockdata, |mut b, bd| {
let height = bd.height as i32;
let block = fuel_indexer_lib::utils::serialize(bd);
b.push_bind(height);
b.push_bind(block);
});

qb.build().execute(conn).await?;

Ok(())
}

pub async fn load_block_data(
conn: &mut PoolConnection<Postgres>,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> sqlx::Result<Vec<BlockData>> {
let end_condition = end_block
.map(|x| format!("AND block_height <= {x}"))
.unwrap_or("".to_string());
let query = format!("SELECT block_data FROM index_block_data WHERE block_height >= {start_block} {end_condition} ORDER BY block_height ASC LIMIT {limit}");

let rows = sqlx::query(&query).fetch_all(conn).await?;

let mut blocks = Vec::new();
for row in rows {
let bytes = row.get::<Vec<u8>, usize>(0);
let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap();
blocks.push(blockdata);
}
Ok(blocks)
}

pub async fn remove_block_data(
conn: &mut PoolConnection<Postgres>,
) -> sqlx::Result<usize> {
execute_query(conn, "DELETE FROM index_block_data;".to_string()).await
}

/// Return all indexers registered to this indexer serivce.
#[cfg_attr(feature = "metrics", metrics)]
pub async fn all_registered_indexers(
Expand Down Expand Up @@ -653,6 +705,23 @@ pub async fn last_block_height_for_indexer(
.unwrap_or(0))
}

/// Return the last block height for stored blocks.
#[cfg_attr(feature = "metrics", metrics)]
pub async fn last_block_height_for_stored_blocks(
conn: &mut PoolConnection<Postgres>,
) -> sqlx::Result<u32> {
let query = "SELECT MAX(block_height) FROM index_block_data LIMIT 1".to_string();

let row = sqlx::query(&query).fetch_one(conn).await?;

let result = row
.try_get::<i32, usize>(0)
.map(|id| id.to_u32().expect("Bad block height."))
.unwrap_or(0);

Ok(result)
}

// TODO: https://github.com/FuelLabs/fuel-indexer/issues/251
#[cfg_attr(feature = "metrics", metrics)]
pub async fn asset_already_exists(
Expand Down
45 changes: 45 additions & 0 deletions packages/fuel-indexer-database/src/queries.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{types::*, IndexerConnection};
use fuel_indexer_postgres as postgres;
use fuel_indexer_types::fuel::BlockData;
use sqlx::types::{
chrono::{DateTime, Utc},
JsonValue,
Expand Down Expand Up @@ -219,6 +220,39 @@ pub async fn register_indexer(
}
}

/// Save `BlockData` in the database.
pub async fn save_block_data(
conn: &mut IndexerConnection,
blockdata: &[BlockData],
) -> sqlx::Result<()> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::save_block_data(c, blockdata).await
}
}
}

/// Load `BlockData` from the database.
pub async fn load_block_data(
conn: &mut IndexerConnection,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> sqlx::Result<Vec<BlockData>> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::load_block_data(c, start_block, end_block, limit).await
}
}
}

/// Remove all stored `BlockData` from the database.
pub async fn remove_block_data(conn: &mut IndexerConnection) -> sqlx::Result<usize> {
match conn {
IndexerConnection::Postgres(ref mut c) => postgres::remove_block_data(c).await,
}
}

/// Return all indexers registered to this indexer serivce.
pub async fn all_registered_indexers(
conn: &mut IndexerConnection,
Expand Down Expand Up @@ -287,6 +321,17 @@ pub async fn last_block_height_for_indexer(
}
}

/// Return the last block height that the given indexer has indexed.
pub async fn last_block_height_for_stored_blocks(
conn: &mut IndexerConnection,
) -> sqlx::Result<u32> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::last_block_height_for_stored_blocks(c).await
}
}
}

pub async fn asset_already_exists(
conn: &mut IndexerConnection,
asset_type: &IndexerAssetType,
Expand Down
14 changes: 14 additions & 0 deletions packages/fuel-indexer-lib/src/config/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ pub struct IndexerArgs {
/// Amount of blocks to return in a request to a Fuel node.
#[clap(long, help = "Amount of blocks to return in a request to a Fuel node.", default_value_t = defaults::NODE_BLOCK_PAGE_SIZE)]
pub block_page_size: usize,

/// Store blocks in the database and use these stored blocks to fast-forward an indexer starting up.
#[clap(
long,
help = "Store blocks in the database and use these stored blocks to fast-forward an indexer starting up."
)]
pub enable_block_store: bool,

/// Remove all stored blocks. Use this flag together with --enable-block-store to redownload block data afresh.
#[clap(
long,
help = "Remove all stored blocks. Use this flag together with --enable-block-store to redownload block data afresh."
)]
pub remove_stored_blocks: bool,
}

#[derive(Debug, Parser, Clone)]
Expand Down
9 changes: 6 additions & 3 deletions packages/fuel-indexer-lib/src/config/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ pub struct FuelClientConfig {
}

impl FuelClientConfig {
pub fn health_check_uri(self) -> Uri {
pub fn uri(&self) -> Uri {
Uri::from(self)
}
pub fn health_check_uri(&self) -> Uri {
let base = Uri::from(self);
format!("{}{}", base, "health")
.parse()
Expand All @@ -34,8 +37,8 @@ impl Env for FuelClientConfig {
}
}

impl From<FuelClientConfig> for Uri {
fn from(config: FuelClientConfig) -> Self {
impl From<&FuelClientConfig> for Uri {
fn from(config: &FuelClientConfig) -> Self {
let uri = derive_http_url(&config.host, &config.port);
uri.parse().unwrap_or_else(|e| {
panic!("Cannot parse HTTP URI from Fuel node config {config:?}: {e}")
Expand Down
10 changes: 10 additions & 0 deletions packages/fuel-indexer-lib/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ impl Default for IndexerArgs {
remove_data: defaults::REMOVE_DATA,
accept_sql_queries: defaults::ACCEPT_SQL,
block_page_size: defaults::NODE_BLOCK_PAGE_SIZE,
enable_block_store: defaults::ENABLE_BLOCK_STORE,
remove_stored_blocks: defaults::REMOVE_STORED_BLOCKS,
}
}
}
Expand Down Expand Up @@ -136,6 +138,8 @@ pub struct IndexerConfig {
pub replace_indexer: bool,
pub accept_sql_queries: bool,
pub block_page_size: usize,
pub enable_block_store: bool,
pub remove_stored_blocks: bool,
}

impl Default for IndexerConfig {
Expand All @@ -157,6 +161,8 @@ impl Default for IndexerConfig {
replace_indexer: defaults::REPLACE_INDEXER,
accept_sql_queries: defaults::ACCEPT_SQL,
block_page_size: defaults::NODE_BLOCK_PAGE_SIZE,
enable_block_store: defaults::ENABLE_BLOCK_STORE,
remove_stored_blocks: defaults::REMOVE_STORED_BLOCKS,
}
}
}
Expand Down Expand Up @@ -238,6 +244,8 @@ impl From<IndexerArgs> for IndexerConfig {
replace_indexer: args.replace_indexer,
accept_sql_queries: args.accept_sql_queries,
block_page_size: args.block_page_size,
enable_block_store: args.enable_block_store,
remove_stored_blocks: args.remove_stored_blocks,
};

config
Expand Down Expand Up @@ -325,6 +333,8 @@ impl From<ApiServerArgs> for IndexerConfig {
replace_indexer: defaults::REPLACE_INDEXER,
accept_sql_queries: args.accept_sql_queries,
block_page_size: defaults::NODE_BLOCK_PAGE_SIZE,
enable_block_store: defaults::ENABLE_BLOCK_STORE,
remove_stored_blocks: defaults::REMOVE_STORED_BLOCKS,
};

config
Expand Down
6 changes: 6 additions & 0 deletions packages/fuel-indexer-lib/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,9 @@ pub const REMOVE_DATA: bool = false;

/// Allow the web server to accept raw SQL queries.
pub const ACCEPT_SQL: bool = false;

/// Store blocks in the database and use these stored blocks to fast-forward an indexer starting up.
pub const ENABLE_BLOCK_STORE: bool = false;

/// Remove all stored blocks from the database.
pub const REMOVE_STORED_BLOCKS: bool = false;
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ rate_limit:
replace_indexer: false
accept_sql_queries: false
block_page_size: 20

enable_block_store: false
remove_stored_blocks: false
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ OPTIONS:
--embedded-database
Automatically create and start database using provided options or defaults.

--enable-block-store
Store blocks in the database and use these stored blocks to fast-forward an indexer
starting up.

--fuel-node-host <FUEL_NODE_HOST>
Host of the running Fuel node. [default: localhost]

Expand Down Expand Up @@ -97,6 +101,10 @@ OPTIONS:
--remove-data
When replacing an indexer, also remove the indexed data.

--remove-stored-blocks
Remove all stored blocks. Use this flag together with --enable-block-store to redownload
block data afresh.

--replace-indexer
Whether to allow replacing an existing indexer. If not specified, an attempt to deploy
over an existing indexer results in an error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ OPTIONS:
--embedded-database
Automatically create and start database using provided options or defaults.

--enable-block-store
Store blocks in the database and use these stored blocks to fast-forward an indexer
starting up.

--fuel-node-host <FUEL_NODE_HOST>
Host of the running Fuel node. [default: localhost]

Expand Down Expand Up @@ -97,6 +101,10 @@ OPTIONS:
--remove-data
When replacing an indexer, also remove the indexed data.

--remove-stored-blocks
Remove all stored blocks. Use this flag together with --enable-block-store to redownload
block data afresh.

--replace-indexer
Whether to allow replacing an existing indexer. If not specified, an attempt to deploy
over an existing indexer results in an error.
Expand Down
Loading