From ba28add92e6077912f5d929105c7f50eb7a976f3 Mon Sep 17 00:00:00 2001 From: lostman Date: Wed, 29 Nov 2023 01:20:02 -0800 Subject: [PATCH] enhancement: add find_many (#1469) --- packages/fuel-indexer-api-server/src/ffi.rs | 4 +- .../fuel-indexer-database/postgres/src/lib.rs | 13 ++ packages/fuel-indexer-database/src/queries.rs | 10 + packages/fuel-indexer-plugin/src/find.rs | 196 ++++++++++++++---- packages/fuel-indexer-plugin/src/wasm.rs | 37 ++-- .../indexers/fuel-indexer-test/src/lib.rs | 41 ++++ packages/fuel-indexer/src/database.rs | 20 +- packages/fuel-indexer/src/ffi.rs | 12 +- 8 files changed, 265 insertions(+), 68 deletions(-) diff --git a/packages/fuel-indexer-api-server/src/ffi.rs b/packages/fuel-indexer-api-server/src/ffi.rs index 76a9595d2..8f8f6e00c 100644 --- a/packages/fuel-indexer-api-server/src/ffi.rs +++ b/packages/fuel-indexer-api-server/src/ffi.rs @@ -19,8 +19,8 @@ pub(crate) fn check_wasm_toolchain_version(data: Vec) -> anyhow::Result, + query: String, +) -> sqlx::Result>> { + let mut builder = sqlx::QueryBuilder::new(query); + let query = builder.build(); + let rows = query.fetch_all(conn).await?; + let objects = rows.iter().map(|r| r.get(0)).collect::>>(); + Ok(objects) +} + /// Run database migrations. #[cfg_attr(feature = "metrics", metrics)] pub async fn run_migration(conn: &mut PoolConnection) -> sqlx::Result<()> { diff --git a/packages/fuel-indexer-database/src/queries.rs b/packages/fuel-indexer-database/src/queries.rs index 138be6f8e..351b6747b 100644 --- a/packages/fuel-indexer-database/src/queries.rs +++ b/packages/fuel-indexer-database/src/queries.rs @@ -144,6 +144,16 @@ pub async fn get_object( } } +/// Fetch multiple blobs of serialized `FtColumns` from the database. +pub async fn get_objects( + conn: &mut IndexerConnection, + query: String, +) -> sqlx::Result>> { + match conn { + IndexerConnection::Postgres(ref mut c) => postgres::get_objects(c, query).await, + } +} + /// Run an arbitrary query and fetch all results. /// /// Note that if the results of the query can't be converted to `JsonValue`, this function diff --git a/packages/fuel-indexer-plugin/src/find.rs b/packages/fuel-indexer-plugin/src/find.rs index c27cf74fc..0944b2d2c 100644 --- a/packages/fuel-indexer-plugin/src/find.rs +++ b/packages/fuel-indexer-plugin/src/find.rs @@ -1,68 +1,134 @@ use fuel_indexer_types::scalar::{Boolean, UID}; use sqlparser::ast as sql; +/// Represents a filter that returns a single results. +pub struct SingleFilter { + filter: String, + phantom: std::marker::PhantomData, +} + +impl std::fmt::Display for SingleFilter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} LIMIT 1", self.filter)?; + Ok(()) + } +} +/// Represents a filter with a an optional LIMIT clause that returns many +/// results. +pub struct ManyFilter { + filter: String, + limit: Option, + phantom: std::marker::PhantomData, +} + +impl ManyFilter { + pub fn limit(&self) -> Option { + self.limit + } +} + +impl std::fmt::Display for ManyFilter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.filter)?; + if let Some(limit) = self.limit { + write!(f, " LIMIT {limit}")?; + } + Ok(()) + } +} + /// Represents `filter` and `order_by` parts of the `SELECT object from {table} /// WHERE {filter} {order_by}` statement that is assembled by the indexer to /// fetch an object from the database. The table name is not available to the /// plugin and thus only a part of the statment is generated there. The indexer /// maps the TYPE_ID to the tale name and assemles the full statemnt. -pub struct QueryFragment { +pub struct OrderedFilter { filter: Filter, - field: Option, - order_by: Option, + order_by: sql::OrderByExpr, } -impl QueryFragment { +impl OrderedFilter { pub fn asc(mut self) -> Self { - if let Some(ref field) = self.field { - self.order_by = Some(sql::OrderByExpr { - expr: sql::Expr::Identifier(sql::Ident::new(field)), - asc: Some(true), - nulls_first: None, - }); - } + self.order_by.asc = Some(true); self } pub fn desc(mut self) -> Self { - if let Some(ref field) = self.field { - self.order_by = Some(sql::OrderByExpr { - expr: sql::Expr::Identifier(sql::Ident::new(field)), - asc: Some(false), - nulls_first: None, - }); - } + self.order_by.asc = Some(false); self } + + pub fn limit(self, limit: usize) -> ManyFilter { + ManyFilter { + filter: self.to_string(), + limit: Some(limit), + phantom: std::marker::PhantomData, + } + } } -/// Convert `QueryFragment` to `String`. `SELECT * from table_name` is later +/// Convert `OrderedFilter` to `String`. `SELECT * from table_name` is later /// added by the Fuel indexer to generate the entire query. -impl std::fmt::Display for QueryFragment { +impl std::fmt::Display for OrderedFilter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.filter)?; - if let Some(ref order_by) = self.order_by { - write!(f, " ORDER BY {}", order_by)?; - } + write!(f, "{} ORDER BY {}", self.filter, self.order_by)?; Ok(()) } } -/// Automatic lifting of `Filter` into `QueryFragment` leaving `ORDER BY` -/// unspecified. -impl From> for QueryFragment { - fn from(filter: Filter) -> Self { - QueryFragment { - filter, - field: None, - order_by: None, +// Conversions between different filter structs. + +impl From> for SingleFilter { + fn from(filter: Filter) -> SingleFilter { + SingleFilter { + filter: filter.to_string(), + phantom: std::marker::PhantomData, + } + } +} + +impl From> for SingleFilter { + fn from(filter: OrderedFilter) -> SingleFilter { + SingleFilter { + filter: filter.to_string(), + phantom: std::marker::PhantomData, + } + } +} + +impl From> for ManyFilter { + fn from(filter: Filter) -> ManyFilter { + ManyFilter { + filter: filter.to_string(), + limit: None, + phantom: std::marker::PhantomData, + } + } +} + +impl From> for ManyFilter { + fn from(filter: OrderedFilter) -> ManyFilter { + ManyFilter { + filter: filter.to_string(), + limit: None, + phantom: std::marker::PhantomData, + } + } +} + +impl From> for ManyFilter { + fn from(filter: SingleFilter) -> ManyFilter { + ManyFilter { + filter: filter.filter, + limit: Some(1), + phantom: std::marker::PhantomData, } } } /// Represents a WHERE clause of the SQL statement. Multiple `Filter`s can be /// joined with `and` and `or` and also ordered, at which point they become -/// `QueryFragment`s. +/// `OrderedFilter`s. pub struct Filter { filter: sql::Expr, phantom: std::marker::PhantomData, @@ -106,18 +172,29 @@ impl Filter { } } - pub fn order_by(self, f: Field) -> QueryFragment { - QueryFragment { + pub fn order_by(self, f: Field) -> OrderedFilter { + OrderedFilter { filter: self, - field: Some(f.field), - order_by: None, + order_by: sql::OrderByExpr { + expr: sql::Expr::Identifier(sql::Ident::new(f.field)), + asc: None, + nulls_first: None, + }, + } + } + + pub fn limit(self, limit: usize) -> ManyFilter { + ManyFilter { + filter: self.to_string(), + limit: Some(limit), + phantom: std::marker::PhantomData, } } } /// A trait used to convert a value of scalar type into `sqlparser::ast::Value`. /// That is, for injecting a value into the `sqlparser`'s representation which -/// we then use to generate a `QueryFragment`. +/// we then use to generate a `OrderedFilter`. pub trait ToSQLValue where Self: Sized, @@ -308,3 +385,46 @@ impl OptionField { Filter::new(expr) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_find_query_generation() { + struct MyStruct {} + + fn my_field() -> Field { + Field { + field: "my_field".to_string(), + phantom: std::marker::PhantomData, + } + } + + let f: Filter = my_field().gt(7); + assert_eq!(&f.to_string(), "my_field > 7"); + + let f: OrderedFilter = my_field().gt(7).order_by(my_field()).asc(); + assert_eq!(&f.to_string(), "my_field > 7 ORDER BY my_field ASC"); + + // Converting to SingleFilter imposes a LIMIT 1 + let sf: SingleFilter = + my_field().gt(7).order_by(my_field()).asc().into(); + assert_eq!( + &sf.to_string(), + "my_field > 7 ORDER BY my_field ASC LIMIT 1" + ); + + // SingleFilter converted to ManyFilter retains the LIMIT 1 + let mf: ManyFilter = sf.into(); + assert_eq!( + &mf.to_string(), + "my_field > 7 ORDER BY my_field ASC LIMIT 1" + ); + + // Converting to ManyFilter does not impose a LIMIT + let mf: ManyFilter = + my_field().gt(7).order_by(my_field()).desc().into(); + assert_eq!(&mf.to_string(), "my_field > 7 ORDER BY my_field DESC"); + } +} diff --git a/packages/fuel-indexer-plugin/src/wasm.rs b/packages/fuel-indexer-plugin/src/wasm.rs index 409e7b780..7957a9746 100644 --- a/packages/fuel-indexer-plugin/src/wasm.rs +++ b/packages/fuel-indexer-plugin/src/wasm.rs @@ -17,14 +17,14 @@ pub use hex::FromHex; pub use sha2::{Digest, Sha256}; pub use std::collections::{HashMap, HashSet}; -pub use crate::find::{Field, Filter, OptionField, QueryFragment}; +pub use crate::find::{Field, Filter, ManyFilter, OptionField, SingleFilter}; // These are instantiated with functions which return // `Result`. `wasmer` unwraps the `Result` and uses the -// `Err` variant for ealy exit. +// `Err` variant for early exit. extern "C" { fn ff_get_object(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8; - fn ff_single_select(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8; + fn ff_find_many(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8; fn ff_log_data(ptr: *const u8, len: u32, log_level: u32); fn ff_put_object(type_id: i64, ptr: *const u8, len: u32); fn ff_put_many_to_many_record(ptr: *const u8, len: u32); @@ -69,7 +69,7 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { /// Convert database row representation into an instance of an entity. fn from_row(vec: Vec) -> Self; - /// Convert an instance of an entity into row representation for use in a database. + /// Convert an instance of an entity into a row representation for use in a database. fn to_row(&self) -> Vec; /// Returns an entity's internal type ID. @@ -128,22 +128,35 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug { } /// Finds the first entity that satisfies the given constraints. - fn find(query: impl Into>) -> Option { - let query: QueryFragment = query.into(); + fn find(filter: impl Into>) -> Option { + let result = Self::find_many(filter.into()); + result.into_iter().next() + } + + /// Finds the entities that satisfy the given constraints. + fn find_many(filter: impl Into>) -> Vec { unsafe { - let buff = bincode::serialize(&query.to_string()).unwrap(); + let filter: ManyFilter = filter.into(); + let buff = bincode::serialize(&filter.to_string()) + .expect("Failed to serialize query"); let mut bufflen = (buff.len() as u32).to_le_bytes(); - let ptr = - ff_single_select(Self::TYPE_ID, buff.as_ptr(), bufflen.as_mut_ptr()); + let ptr = ff_find_many(Self::TYPE_ID, buff.as_ptr(), bufflen.as_mut_ptr()); if !ptr.is_null() { let len = u32::from_le_bytes(bufflen) as usize; let bytes = Vec::from_raw_parts(ptr, len, len); - let data = deserialize(&bytes).unwrap(); - Some(Self::from_row(data)) + let data: Vec> = + deserialize(&bytes).expect("Failed to deserialize data"); + data.iter() + .map(|x| { + Self::from_row( + deserialize(x).expect("Failed to deserialize data"), + ) + }) + .collect() } else { - None + vec![] } } } diff --git a/packages/fuel-indexer-tests/indexers/fuel-indexer-test/src/lib.rs b/packages/fuel-indexer-tests/indexers/fuel-indexer-test/src/lib.rs index f0bb8e713..538dd4a50 100644 --- a/packages/fuel-indexer-tests/indexers/fuel-indexer-test/src/lib.rs +++ b/packages/fuel-indexer-tests/indexers/fuel-indexer-test/src/lib.rs @@ -646,6 +646,47 @@ mod fuel_indexer_test { ) .unwrap(); assert_eq!(&f.string_value, "find5"); + + // Test searching for multiple entities, no limit + let fs: Vec = FindEntity::find_many( + FindEntity::string_value() + .gt("f".to_string()) + .order_by(FindEntity::value()) + .asc(), + ); + assert_eq!(fs.len(), 4); + assert_eq!(fs[0].string_value, "find2"); + assert_eq!(fs[1].string_value, "find3"); + assert_eq!(fs[2].string_value, "find4"); + assert_eq!(fs[3].string_value, "find5"); + + // Test searching for multiple entities, with limit + let fs: Vec = FindEntity::find_many( + FindEntity::string_value() + .gt("f".to_string()) + .order_by(FindEntity::value()) + .desc() + .limit(10), + ); + + assert_eq!(fs.len(), 4); + assert_eq!(fs[0].string_value, "find5"); + assert_eq!(fs[1].string_value, "find4"); + assert_eq!(fs[2].string_value, "find3"); + assert_eq!(fs[3].string_value, "find2"); + + // Test searching for multiple entities, with limit + let fs: Vec = FindEntity::find_many( + FindEntity::string_value() + .gt("f".to_string()) + .order_by(FindEntity::value()) + .desc() + .limit(2), + ); + + assert_eq!(fs.len(), 2); + assert_eq!(fs[0].string_value, "find5"); + assert_eq!(fs[1].string_value, "find4"); } else if block_data.height == 6 { // There is no such block. The lookup will fail. IndexMetadataEntity::find(IndexMetadataEntity::block_height().eq(777)) diff --git a/packages/fuel-indexer/src/database.rs b/packages/fuel-indexer/src/database.rs index 382417d54..3e4803c21 100644 --- a/packages/fuel-indexer/src/database.rs +++ b/packages/fuel-indexer/src/database.rs @@ -212,34 +212,32 @@ Do your WASM modules need to be rebuilt? } } - /// Get an object from the database. - pub async fn single_select( + /// Get multiple objects from the database that satisfy the given constraints. + pub async fn find_many( &mut self, type_id: i64, constraints: String, - ) -> IndexerResult>> { + ) -> IndexerResult>> { let table = &self .tables .get(&type_id) .ok_or(IndexerDatabaseError::TableMappingDoesNotExist(type_id))?; - let query = format!("SELECT object from {table} WHERE {constraints} LIMIT 1"); + let query = format!("SELECT object from {table} WHERE {constraints}"); let conn = self .stashed .as_mut() - .ok_or(IndexerError::NoTransactionError( - "single_select".to_string(), - ))?; - match queries::get_object(conn, query).await { - Ok(v) => Ok(Some(v)), + .ok_or(IndexerError::NoTransactionError("find_many".to_string()))?; + match queries::get_objects(conn, query).await { + Ok(v) => Ok(v), Err(e) => { if let sqlx::Error::RowNotFound = e { debug!("Row not found"); } else { - error!("Failed to get_object: {e:?}"); + error!("Failed to get_objects: {e:?}"); } - Ok(None) + Ok(vec![]) } } } diff --git a/packages/fuel-indexer/src/ffi.rs b/packages/fuel-indexer/src/ffi.rs index 3b91d49f5..ba97d4524 100644 --- a/packages/fuel-indexer/src/ffi.rs +++ b/packages/fuel-indexer/src/ffi.rs @@ -206,7 +206,7 @@ fn get_object( } } -fn single_select( +fn find_many( mut env: FunctionEnvMut, type_id: i64, ptr: u32, @@ -243,12 +243,14 @@ fn single_select( .db .lock() .await - .single_select(type_id, constraints) + .find_many(type_id, constraints) .await }) .unwrap(); - if let Some(bytes) = bytes { + if !bytes.is_empty() { + let bytes = fuel_indexer_lib::utils::serialize(&bytes); + let alloc_fn = idx_env .alloc .as_mut() @@ -425,7 +427,7 @@ pub fn get_exports(store: &mut Store, env: &wasmer::FunctionEnv) -> Ex let mut exports = Exports::new(); let f_get_obj = Function::new_typed_with_env(store, env, get_object); - let f_single_select = Function::new_typed_with_env(store, env, single_select); + let f_find_many = Function::new_typed_with_env(store, env, find_many); let f_put_obj = Function::new_typed_with_env(store, env, put_object); let f_log_data = Function::new_typed_with_env(store, env, log_data); let f_put_many_to_many_record = @@ -434,7 +436,7 @@ pub fn get_exports(store: &mut Store, env: &wasmer::FunctionEnv) -> Ex exports.insert("ff_early_exit".to_string(), f_early_exit); exports.insert("ff_get_object".to_string(), f_get_obj); - exports.insert("ff_single_select".to_string(), f_single_select); + exports.insert("ff_find_many".to_string(), f_find_many); exports.insert("ff_put_object".to_string(), f_put_obj); exports.insert( "ff_put_many_to_many_record".to_string(),