diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 8b17022992..5c10277b64 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -265,9 +265,9 @@ async fn get_feed_active( #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct OraclePriceAggregatedIntervalResponse { - pub id: String, - pub key: String, - pub sort: String, + pub id: String, // token-currency-interval-height + pub key: String, // token-currency-interval + pub sort: String, // medianTime-height pub token: Token, pub currency: Currency, pub aggregated: OraclePriceAggregatedIntervalAggregated, @@ -304,34 +304,35 @@ async fn get_feed_with_interval( 86400 => OracleIntervalSeconds::OneDay, _ => return Err(From::from("Invalid oracle interval")), }; - let key = (token, currency, interval_type); - let repo = &ctx.services.oracle_price_aggregated_interval; + let id = (token.clone(), currency.clone(), interval_type.clone(), u32::MAX); - let keys = repo - .by_key - .list(Some(key), SortOrder::Descending)? + let items = ctx + .services + .oracle_price_aggregated_interval + .by_id + .list(Some(id), SortOrder::Descending)? .take(query.size) + .take_while(|item| match item { + Ok(((t, c, i, _), _)) => t == &token.clone() && c == ¤cy.clone() && i == &interval_type.clone(), + _ => true, + }) .flatten() .collect::>(); let mut prices = Vec::new(); - for ((token, currency, _), id) in keys { - let item = repo.by_id.get(&id)?; - - let Some(item) = item else { continue }; - + for (id, item) in items { let start = item.block.median_time - (item.block.median_time % interval); let price = OraclePriceAggregatedIntervalResponse { - id: format!("{}-{}-{:?}", id.0, id.1, id.2), - key: format!("{}-{}", id.0, id.1), + id: format!("{}-{}-{:?}-{}", id.0, id.1, id.2, id.3), + key: format!("{}-{}-{:?}", id.0, id.1, id.2), sort: format!( "{}{}", hex::encode(item.block.median_time.to_be_bytes()), hex::encode(item.block.height.to_be_bytes()), ), - token, - currency, + token: token.clone(), + currency: currency.clone(), aggregated: OraclePriceAggregatedIntervalAggregated { amount: item.aggregated.amount, weightage: item.aggregated.weightage, diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 1c3fb6aff3..fbbcbd7138 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -12,8 +12,7 @@ use snafu::OptionExt; use crate::{ error::{ - ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, OtherSnafu, - ToPrimitiveSnafu, + ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, ToPrimitiveSnafu }, indexer::{Context, Index, Result}, model::{ @@ -494,10 +493,11 @@ fn start_new_bucket( aggregated: &OraclePriceAggregated, interval: OracleIntervalSeconds, ) -> Result<()> { - let key = (token.clone(), currency.clone(), interval.clone()); let id = (token, currency, interval, block.height); - let repo = &services.oracle_price_aggregated_interval; - repo.by_id.put( + services + .oracle_price_aggregated_interval + .by_id + .put( &id, &OraclePriceAggregatedInterval { aggregated: OraclePriceAggregatedIntervalAggregated { @@ -512,7 +512,6 @@ fn start_new_bucket( block: block.clone(), }, )?; - repo.by_key.put(&key, &id)?; Ok(()) } @@ -527,30 +526,28 @@ pub fn index_interval_mapper( ) -> Result<()> { let repo = &services.oracle_price_aggregated_interval; let previous = repo - .by_key + .by_id .list( - Some((token.clone(), currency.clone(), interval.clone())), + Some((token.clone(), currency.clone(), interval.clone(), u32::MAX)), SortOrder::Descending, )? - .take(1) - .flatten() - .collect::>(); + .take_while(|item| match item { + Ok(((t, c, i, _), _)) => t == &token.clone() && c == ¤cy.clone() && i == &interval.clone(), + _ => true, + }) + .next() + .transpose()?; - if previous.is_empty() { + let Some(previous) = previous else { return start_new_bucket(services, block, token, currency, aggregated, interval); - } - - for (_, id) in previous { - let aggregated_interval = repo.by_id.get(&id)?; - if let Some(aggregated_interval) = aggregated_interval { - if block.median_time - aggregated.block.median_time > interval.clone() as i64 { - return start_new_bucket(services, block, token, currency, aggregated, interval); - } + }; - forward_aggregate(services, (id, &aggregated_interval), aggregated)?; - } + if block.median_time - aggregated.block.median_time > interval.clone() as i64 { + return start_new_bucket(services, block, token, currency, aggregated, interval); } + forward_aggregate(services, previous, aggregated)?; + Ok(()) } @@ -564,29 +561,24 @@ pub fn invalidate_oracle_interval( ) -> Result<()> { let repo = &services.oracle_price_aggregated_interval; let previous = repo - .by_key + .by_id .list( - Some((token.to_string(), currency.to_string(), interval.clone())), + Some((token.to_string(), currency.to_string(), interval.clone(), u32::MAX)), SortOrder::Descending, )? - .take(1) - .map(|item| { - let (_, id) = item?; - let price = services - .oracle_price_aggregated_interval - .by_id - .get(&id)? - .context(OtherSnafu { - msg: "Missing oracle price aggregated interval index", - })?; - Ok((id, price)) - }) - .collect::>>()?; + .next() + .transpose()?; - let (prev_id, previous) = &previous[0]; + let Some((prev_id, previous)) = previous else { + return Err(Error::NotFoundIndex { + action: IndexAction::Invalidate, + r#type: "Invalidate oracle price aggregated interval".to_string(), + id: format!("{}-{}-{:?}", token, currency, interval), + }) + }; if previous.aggregated.count == 1 { - return repo.by_id.delete(prev_id); + return repo.by_id.delete(&prev_id); } let last_price = previous.aggregated.clone(); @@ -630,11 +622,7 @@ pub fn invalidate_oracle_interval( }, block: previous.block.clone(), }; - repo.by_id.put(prev_id, &aggregated_interval)?; - repo.by_key.put( - &(prev_id.0.clone(), prev_id.1.clone(), prev_id.2.clone()), - prev_id, - )?; + repo.by_id.put(&prev_id, &aggregated_interval)?; Ok(()) } @@ -642,7 +630,7 @@ fn forward_aggregate( services: &Arc, previous: ( OraclePriceAggregatedIntervalId, - &OraclePriceAggregatedInterval, + OraclePriceAggregatedInterval, ), aggregated: &OraclePriceAggregated, ) -> Result<()> { @@ -692,10 +680,6 @@ fn forward_aggregate( .oracle_price_aggregated_interval .by_id .put(&prev_id, &aggregated_interval)?; - services.oracle_price_aggregated_interval.by_key.put( - &(prev_id.0.clone(), prev_id.1.clone(), prev_id.2.clone()), - &prev_id, - )?; Ok(()) } diff --git a/lib/ain-ocean/src/lib.rs b/lib/ain-ocean/src/lib.rs index 6f5f165f42..d513af1d7f 100644 --- a/lib/ain-ocean/src/lib.rs +++ b/lib/ain-ocean/src/lib.rs @@ -79,7 +79,6 @@ pub struct OraclePriceActiveService { by_id: OraclePriceActive, } pub struct OraclePriceAggregatedIntervalService { - by_key: OraclePriceAggregatedIntervalKey, by_id: OraclePriceAggregatedInterval, } pub struct OraclePriceAggregatedService { @@ -188,7 +187,6 @@ impl Services { by_id: OraclePriceActive::new(Arc::clone(&store)), }, oracle_price_aggregated_interval: OraclePriceAggregatedIntervalService { - by_key: OraclePriceAggregatedIntervalKey::new(Arc::clone(&store)), by_id: OraclePriceAggregatedInterval::new(Arc::clone(&store)), }, oracle_price_aggregated: OraclePriceAggregatedService { diff --git a/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs b/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs index 215c3876f7..6b5bb65d5b 100644 --- a/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs +++ b/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs @@ -4,13 +4,12 @@ use serde::{Deserialize, Serialize}; use super::BlockContext; pub type OraclePriceAggregatedIntervalId = (Token, Currency, OracleIntervalSeconds, u32); //token-currency-interval-height -pub type OraclePriceAggregatedIntervalKey = (Token, Currency, OracleIntervalSeconds); //token-currency-interval pub const FIFTEEN_MINUTES: isize = 15 * 60; pub const ONE_HOUR: isize = 60 * 60; pub const ONE_DAY: isize = 24 * 60 * 60; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub enum OracleIntervalSeconds { FifteenMinutes = FIFTEEN_MINUTES, OneHour = ONE_HOUR, diff --git a/lib/ain-ocean/src/storage/mod.rs b/lib/ain-ocean/src/storage/mod.rs index 69b6c6a58f..6428af39dd 100644 --- a/lib/ain-ocean/src/storage/mod.rs +++ b/lib/ain-ocean/src/storage/mod.rs @@ -189,15 +189,6 @@ define_table! { } } -define_table! { - #[derive(Debug)] - pub struct OraclePriceAggregatedIntervalKey { - key_type = model::OraclePriceAggregatedIntervalKey, - value_type = model::OraclePriceAggregatedIntervalId, - }, - SecondaryIndex = OraclePriceAggregatedInterval -} - define_table! { #[derive(Debug)] pub struct OraclePriceFeed { @@ -444,7 +435,7 @@ define_table! { SecondaryIndex = VaultAuctionHistory } -pub const COLUMN_NAMES: [&str; 32] = [ +pub const COLUMN_NAMES: [&str; 31] = [ Block::NAME, BlockByHeight::NAME, MasternodeStats::NAME, @@ -456,7 +447,6 @@ pub const COLUMN_NAMES: [&str; 32] = [ OraclePriceActiveKey::NAME, OraclePriceAggregated::NAME, OraclePriceAggregatedInterval::NAME, - OraclePriceAggregatedIntervalKey::NAME, OraclePriceFeed::NAME, OraclePriceFeedKey::NAME, OracleTokenCurrency::NAME,