Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into maciej/joinset
Browse files Browse the repository at this point in the history
  • Loading branch information
lostman committed Sep 11, 2023
2 parents 49a952d + 8a2636e commit c3549ed
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 77 deletions.
3 changes: 0 additions & 3 deletions packages/fuel-indexer-database/database-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,6 @@ pub struct IndexerAsset {
/// Database ID of the indexer.
pub index_id: i64,

/// Version associated with this indexer asset.
pub version: i32,

/// Digest of the asset's bytes.
pub digest: String,

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Active: 1693382282533@@127.0.0.1@5432@postgres
ALTER TABLE index_asset_registry_manifest ADD COLUMN version integer not null;
ALTER TABLE index_asset_registry_schema ADD COLUMN version integer not null;
ALTER TABLE index_asset_registry_wasm ADD COLUMN version integer not null;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE index_asset_registry_manifest DROP COLUMN version;
ALTER TABLE index_asset_registry_schema DROP COLUMN version;
ALTER TABLE index_asset_registry_wasm DROP COLUMN version;
62 changes: 14 additions & 48 deletions packages/fuel-indexer-database/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,27 +526,6 @@ pub async fn all_registered_indexers(
.collect::<Vec<RegisteredIndexer>>())
}

#[cfg_attr(feature = "metrics", metrics)]
pub async fn indexer_asset_version(
conn: &mut PoolConnection<Postgres>,
index_id: &i64,
asset_type: &IndexerAssetType,
) -> sqlx::Result<i64> {
match sqlx::query(&format!(
"SELECT COUNT(*)
FROM index_asset_registry_{}
WHERE index_id = {}",
asset_type.as_ref(),
index_id,
))
.fetch_one(conn)
.await
{
Ok(row) => Ok(row.try_get::<i64, usize>(0).unwrap_or(0)),
Err(_e) => Ok(0),
}
}

/// Register a single indexer asset.
#[cfg_attr(feature = "metrics", metrics)]
pub async fn register_indexer_asset(
Expand Down Expand Up @@ -577,15 +556,10 @@ pub async fn register_indexer_asset(
return Ok(asset);
}

let current_version = indexer_asset_version(conn, &index.id, &asset_type)
.await
.expect("Failed to get asset version.");

let query = format!(
"INSERT INTO index_asset_registry_{} (index_id, bytes, version, digest) VALUES ({}, $1, {}, '{digest}') RETURNING *",
"INSERT INTO index_asset_registry_{} (index_id, bytes, digest) VALUES ({}, $1, '{digest}') RETURNING *",
asset_type.as_ref(),
index.id,
current_version + 1,
);

let row = sqlx::QueryBuilder::new(query)
Expand All @@ -603,22 +577,20 @@ pub async fn register_indexer_asset(

let id = row.get(0);
let index_id = row.get(1);
let version = row.get(2);
let digest = row.get(3);
let bytes = row.get(4);
let digest = row.get(2);
let bytes = row.get(3);

Ok(IndexerAsset {
id,
index_id,
version,
digest,
bytes,
})
}

/// Return the latest version for a given indexer asset type.
/// Returns the requested asset for an indexer with the given id.
#[cfg_attr(feature = "metrics", metrics)]
pub async fn latest_asset_for_indexer(
pub async fn indexer_asset(
conn: &mut PoolConnection<Postgres>,
index_id: &i64,
asset_type: IndexerAssetType,
Expand All @@ -633,30 +605,26 @@ pub async fn latest_asset_for_indexer(

let id = row.get(0);
let index_id = row.get(1);
let version = row.get(2);
let digest = row.get(3);
let bytes = row.get(4);
let digest = row.get(2);
let bytes = row.get(3);

Ok(IndexerAsset {
id,
index_id,
version,
digest,
bytes,
})
}

/// Return the latest version for every indexer asset type.
/// Return every indexer asset type for an indexer with the give id.
#[cfg_attr(feature = "metrics", metrics)]
pub async fn latest_assets_for_indexer(
pub async fn indexer_assets(
conn: &mut PoolConnection<Postgres>,
indexer_id: &i64,
) -> sqlx::Result<IndexerAssetBundle> {
let wasm = latest_asset_for_indexer(conn, indexer_id, IndexerAssetType::Wasm).await?;
let schema =
latest_asset_for_indexer(conn, indexer_id, IndexerAssetType::Schema).await?;
let manifest =
latest_asset_for_indexer(conn, indexer_id, IndexerAssetType::Manifest).await?;
let wasm = indexer_asset(conn, indexer_id, IndexerAssetType::Wasm).await?;
let schema = indexer_asset(conn, indexer_id, IndexerAssetType::Schema).await?;
let manifest = indexer_asset(conn, indexer_id, IndexerAssetType::Manifest).await?;

Ok(IndexerAssetBundle {
wasm,
Expand Down Expand Up @@ -705,14 +673,12 @@ pub async fn asset_already_exists(
Ok(row) => {
let id = row.get(0);
let index_id = row.get(1);
let version = row.get(2);
let digest = row.get(3);
let bytes = row.get(4);
let digest = row.get(2);
let bytes = row.get(3);

Ok(Some(IndexerAsset {
id,
index_id,
version,
digest,
bytes,
}))
Expand Down
24 changes: 6 additions & 18 deletions packages/fuel-indexer-database/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,6 @@ pub async fn all_registered_indexers(
}
}

pub async fn indexer_asset_version(
conn: &mut IndexerConnection,
index_id: &i64,
asset_type: &IndexerAssetType,
) -> sqlx::Result<i64> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::indexer_asset_version(c, index_id, asset_type).await
}
}
}

/// Register a single indexer asset.
pub async fn register_indexer_asset(
conn: &mut IndexerConnection,
Expand All @@ -261,27 +249,27 @@ pub async fn register_indexer_asset(
}
}

/// Return the latest version for a given indexer asset type.
pub async fn latest_asset_for_indexer(
/// Returns the requested asset for an indexer with the given id.
pub async fn indexer_asset(
conn: &mut IndexerConnection,
index_id: &i64,
asset_type: IndexerAssetType,
) -> sqlx::Result<IndexerAsset> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::latest_asset_for_indexer(c, index_id, asset_type).await
postgres::indexer_asset(c, index_id, asset_type).await
}
}
}

/// Return the latest version for every indexer asset type.
pub async fn latest_assets_for_indexer(
/// Return every indexer asset type for an indexer with the give id.
pub async fn indexer_assets(
conn: &mut IndexerConnection,
index_id: &i64,
) -> sqlx::Result<IndexerAssetBundle> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::latest_assets_for_indexer(c, index_id).await
postgres::indexer_assets(c, index_id).await
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions packages/fuel-indexer-schema/src/db/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,9 @@ impl IndexerSchema {

let indexer_id =
queries::get_indexer_id(&mut conn, namespace, identifier).await?;
let IndexerAsset { bytes, .. } = queries::latest_asset_for_indexer(
&mut conn,
&indexer_id,
IndexerAssetType::Manifest,
)
.await?;
let IndexerAsset { bytes, .. } =
queries::indexer_asset(&mut conn, &indexer_id, IndexerAssetType::Manifest)
.await?;
let manifest = Manifest::try_from(&bytes)?;

let schema = GraphQLSchema::new(root.schema.clone());
Expand Down
4 changes: 2 additions & 2 deletions packages/fuel-indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl IndexerService {
let mut conn = self.pool.acquire().await?;
let indices = queries::all_registered_indexers(&mut conn).await?;
for index in indices {
let assets = queries::latest_assets_for_indexer(&mut conn, &index.id).await?;
let assets = queries::indexer_assets(&mut conn, &index.id).await?;
let mut manifest = Manifest::try_from(&assets.manifest.bytes)?;

let start_block = get_start_block(&mut conn, &manifest).await.unwrap_or(1);
Expand Down Expand Up @@ -280,7 +280,7 @@ impl IndexerService {
{
Ok(id) => {
let assets =
queries::latest_assets_for_indexer(&mut conn, &id)
queries::indexer_assets(&mut conn, &id)
.await?;
let mut manifest =
Manifest::try_from(&assets.manifest.bytes)?;
Expand Down

0 comments on commit c3549ed

Please sign in to comment.