Skip to content

Commit

Permalink
opt: rm oracle_price_aggregated_interval key
Browse files Browse the repository at this point in the history
  • Loading branch information
canonbrother committed Oct 23, 2024
1 parent 5bf7ddb commit f5fb27a
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 81 deletions.
35 changes: 18 additions & 17 deletions lib/ain-ocean/src/api/prices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ async fn get_feed_active(
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OraclePriceAggregatedIntervalResponse {
pub id: String,
pub key: String,
pub sort: String,
pub id: String, // token-currency-interval-height
pub key: String, // token-currency-interval
pub sort: String, // medianTime-height
pub token: Token,
pub currency: Currency,
pub aggregated: OraclePriceAggregatedIntervalAggregated,
Expand Down Expand Up @@ -304,34 +304,35 @@ async fn get_feed_with_interval(
86400 => OracleIntervalSeconds::OneDay,
_ => return Err(From::from("Invalid oracle interval")),
};
let key = (token, currency, interval_type);
let repo = &ctx.services.oracle_price_aggregated_interval;
let id = (token.clone(), currency.clone(), interval_type.clone(), u32::MAX);

let keys = repo
.by_key
.list(Some(key), SortOrder::Descending)?
let items = ctx
.services
.oracle_price_aggregated_interval
.by_id
.list(Some(id), SortOrder::Descending)?
.take(query.size)
.take_while(|item| match item {
Ok(((t, c, i, _), _)) => t == &token.clone() && c == &currency.clone() && i == &interval_type.clone(),
_ => true,
})
.flatten()
.collect::<Vec<_>>();

let mut prices = Vec::new();
for ((token, currency, _), id) in keys {
let item = repo.by_id.get(&id)?;

let Some(item) = item else { continue };

for (id, item) in items {
let start = item.block.median_time - (item.block.median_time % interval);

let price = OraclePriceAggregatedIntervalResponse {
id: format!("{}-{}-{:?}", id.0, id.1, id.2),
key: format!("{}-{}", id.0, id.1),
id: format!("{}-{}-{:?}-{}", id.0, id.1, id.2, id.3),
key: format!("{}-{}-{:?}", id.0, id.1, id.2),
sort: format!(
"{}{}",
hex::encode(item.block.median_time.to_be_bytes()),
hex::encode(item.block.height.to_be_bytes()),
),
token,
currency,
token: token.clone(),
currency: currency.clone(),
aggregated: OraclePriceAggregatedIntervalAggregated {
amount: item.aggregated.amount,
weightage: item.aggregated.weightage,
Expand Down
82 changes: 33 additions & 49 deletions lib/ain-ocean/src/indexer/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use snafu::OptionExt;

use crate::{
error::{
ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, OtherSnafu,
ToPrimitiveSnafu,
ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, ToPrimitiveSnafu
},
indexer::{Context, Index, Result},
model::{
Expand Down Expand Up @@ -494,10 +493,11 @@ fn start_new_bucket(
aggregated: &OraclePriceAggregated,
interval: OracleIntervalSeconds,
) -> Result<()> {
let key = (token.clone(), currency.clone(), interval.clone());
let id = (token, currency, interval, block.height);
let repo = &services.oracle_price_aggregated_interval;
repo.by_id.put(
services
.oracle_price_aggregated_interval
.by_id
.put(
&id,
&OraclePriceAggregatedInterval {
aggregated: OraclePriceAggregatedIntervalAggregated {
Expand All @@ -512,7 +512,6 @@ fn start_new_bucket(
block: block.clone(),
},
)?;
repo.by_key.put(&key, &id)?;

Ok(())
}
Expand All @@ -527,30 +526,28 @@ pub fn index_interval_mapper(
) -> Result<()> {
let repo = &services.oracle_price_aggregated_interval;
let previous = repo
.by_key
.by_id
.list(
Some((token.clone(), currency.clone(), interval.clone())),
Some((token.clone(), currency.clone(), interval.clone(), u32::MAX)),
SortOrder::Descending,
)?
.take(1)
.flatten()
.collect::<Vec<_>>();
.take_while(|item| match item {
Ok(((t, c, i, _), _)) => t == &token.clone() && c == &currency.clone() && i == &interval.clone(),
_ => true,
})
.next()
.transpose()?;

if previous.is_empty() {
let Some(previous) = previous else {
return start_new_bucket(services, block, token, currency, aggregated, interval);
}

for (_, id) in previous {
let aggregated_interval = repo.by_id.get(&id)?;
if let Some(aggregated_interval) = aggregated_interval {
if block.median_time - aggregated.block.median_time > interval.clone() as i64 {
return start_new_bucket(services, block, token, currency, aggregated, interval);
}
};

forward_aggregate(services, (id, &aggregated_interval), aggregated)?;
}
if block.median_time - aggregated.block.median_time > interval.clone() as i64 {
return start_new_bucket(services, block, token, currency, aggregated, interval);
}

forward_aggregate(services, previous, aggregated)?;

Ok(())
}

Expand All @@ -564,29 +561,24 @@ pub fn invalidate_oracle_interval(
) -> Result<()> {
let repo = &services.oracle_price_aggregated_interval;
let previous = repo
.by_key
.by_id
.list(
Some((token.to_string(), currency.to_string(), interval.clone())),
Some((token.to_string(), currency.to_string(), interval.clone(), u32::MAX)),
SortOrder::Descending,
)?
.take(1)
.map(|item| {
let (_, id) = item?;
let price = services
.oracle_price_aggregated_interval
.by_id
.get(&id)?
.context(OtherSnafu {
msg: "Missing oracle price aggregated interval index",
})?;
Ok((id, price))
})
.collect::<Result<Vec<_>>>()?;
.next()
.transpose()?;

let (prev_id, previous) = &previous[0];
let Some((prev_id, previous)) = previous else {
return Err(Error::NotFoundIndex {
action: IndexAction::Invalidate,
r#type: "Invalidate oracle price aggregated interval".to_string(),
id: format!("{}-{}-{:?}", token, currency, interval),
})
};

if previous.aggregated.count == 1 {
return repo.by_id.delete(prev_id);
return repo.by_id.delete(&prev_id);
}

let last_price = previous.aggregated.clone();
Expand Down Expand Up @@ -630,19 +622,15 @@ pub fn invalidate_oracle_interval(
},
block: previous.block.clone(),
};
repo.by_id.put(prev_id, &aggregated_interval)?;
repo.by_key.put(
&(prev_id.0.clone(), prev_id.1.clone(), prev_id.2.clone()),
prev_id,
)?;
repo.by_id.put(&prev_id, &aggregated_interval)?;
Ok(())
}

fn forward_aggregate(
services: &Arc<Services>,
previous: (
OraclePriceAggregatedIntervalId,
&OraclePriceAggregatedInterval,
OraclePriceAggregatedInterval,
),
aggregated: &OraclePriceAggregated,
) -> Result<()> {
Expand Down Expand Up @@ -692,10 +680,6 @@ fn forward_aggregate(
.oracle_price_aggregated_interval
.by_id
.put(&prev_id, &aggregated_interval)?;
services.oracle_price_aggregated_interval.by_key.put(
&(prev_id.0.clone(), prev_id.1.clone(), prev_id.2.clone()),
&prev_id,
)?;
Ok(())
}

Expand Down
2 changes: 0 additions & 2 deletions lib/ain-ocean/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ pub struct OraclePriceActiveService {
by_id: OraclePriceActive,
}
pub struct OraclePriceAggregatedIntervalService {
by_key: OraclePriceAggregatedIntervalKey,
by_id: OraclePriceAggregatedInterval,
}
pub struct OraclePriceAggregatedService {
Expand Down Expand Up @@ -188,7 +187,6 @@ impl Services {
by_id: OraclePriceActive::new(Arc::clone(&store)),
},
oracle_price_aggregated_interval: OraclePriceAggregatedIntervalService {
by_key: OraclePriceAggregatedIntervalKey::new(Arc::clone(&store)),
by_id: OraclePriceAggregatedInterval::new(Arc::clone(&store)),
},
oracle_price_aggregated: OraclePriceAggregatedService {
Expand Down
3 changes: 1 addition & 2 deletions lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use serde::{Deserialize, Serialize};

use super::BlockContext;
pub type OraclePriceAggregatedIntervalId = (Token, Currency, OracleIntervalSeconds, u32); //token-currency-interval-height
pub type OraclePriceAggregatedIntervalKey = (Token, Currency, OracleIntervalSeconds); //token-currency-interval

pub const FIFTEEN_MINUTES: isize = 15 * 60;
pub const ONE_HOUR: isize = 60 * 60;
pub const ONE_DAY: isize = 24 * 60 * 60;

#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub enum OracleIntervalSeconds {
FifteenMinutes = FIFTEEN_MINUTES,
OneHour = ONE_HOUR,
Expand Down
12 changes: 1 addition & 11 deletions lib/ain-ocean/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,6 @@ define_table! {
}
}

define_table! {
#[derive(Debug)]
pub struct OraclePriceAggregatedIntervalKey {
key_type = model::OraclePriceAggregatedIntervalKey,
value_type = model::OraclePriceAggregatedIntervalId,
},
SecondaryIndex = OraclePriceAggregatedInterval
}

define_table! {
#[derive(Debug)]
pub struct OraclePriceFeed {
Expand Down Expand Up @@ -444,7 +435,7 @@ define_table! {
SecondaryIndex = VaultAuctionHistory
}

pub const COLUMN_NAMES: [&str; 32] = [
pub const COLUMN_NAMES: [&str; 31] = [
Block::NAME,
BlockByHeight::NAME,
MasternodeStats::NAME,
Expand All @@ -456,7 +447,6 @@ pub const COLUMN_NAMES: [&str; 32] = [
OraclePriceActiveKey::NAME,
OraclePriceAggregated::NAME,
OraclePriceAggregatedInterval::NAME,
OraclePriceAggregatedIntervalKey::NAME,
OraclePriceFeed::NAME,
OraclePriceFeedKey::NAME,
OracleTokenCurrency::NAME,
Expand Down

0 comments on commit f5fb27a

Please sign in to comment.