Skip to content

Commit

Permalink
[cli] Refactor oracle and implement rooch oracle command (#2714)
Browse files Browse the repository at this point in the history
* [framework] Add entry function and event for oracle

* [gas_market] Trusted oracle allow add new data source

* [cli] Refactor oracle and implement rooch oracle command

* [docker] Remove facuet and oracle from Dockerfile

* [script] Temp disable the API ratelimit for mainnet
  • Loading branch information
jolestar authored Sep 30, 2024
1 parent 2e2b660 commit 48ccca4
Show file tree
Hide file tree
Showing 34 changed files with 1,321 additions and 678 deletions.
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ rooch-db = { path = "crates/rooch-db" }
rooch-event = { path = "crates/rooch-event" }
rooch-ord = { path = "crates/rooch-ord" }
rooch-cosmwasm-vm = { path = "crates/rooch-cosmwasm-vm" }
rooch-oracle = { path = "crates/rooch-oracle" }

# frameworks
framework-types = { path = "frameworks/framework-types" }
Expand Down Expand Up @@ -205,7 +206,7 @@ proptest-derive = "0.3.0"
rayon = "1.5.2"
rand = "0.8.5"
rand_core = { version = "0.6.3", default-features = false }
reqwest = { version = "0.12", features = ["json"] }
reqwest = { version = "0.12", features = ["json", "stream"] }
schemars = { version = "0.8.21", features = ["either"] }
serde = { version = "1.0.210", features = ["derive", "rc"] }
serde_bytes = "0.11.15"
Expand All @@ -227,6 +228,7 @@ tiny-bip39 = "1.0.0"
tokio = { version = "1.40.0", features = ["full"] }
tokio-util = "0.7.12"
tokio-tungstenite = { version = "0.23.1", features = ["native-tls"] }
tokio-stream = "0.1.16"
tonic = { version = "0.8", features = ["gzip"] }
tracing = "0.1.37"
tracing-appender = "0.2.2"
Expand Down
8 changes: 7 additions & 1 deletion crates/rooch-oracle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ edition = "2021"
[dependencies]
tokio = { workspace = true }
tokio-tungstenite = { workspace = true }
tokio-stream = { workspace = true }
tungstenite = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
futures-util = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
log = { workspace = true }
clap = { workspace = true }
clap = { features = ["derive"], workspace = true }
reqwest = { workspace = true }
anyhow = { workspace = true }
bcs = { workspace = true }
async-trait = { workspace = true }
pin-project = { workspace = true }

rooch-rpc-api = { workspace = true }
move-core-types = { workspace = true }
Expand Down
225 changes: 225 additions & 0 deletions crates/rooch-oracle/src/aggregator_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::datasource::OracleDecimalData;
use anyhow::Result;
use futures::stream::Stream;
use futures::task::{Context, Poll};
use move_core_types::u256::U256;
use pin_project::pin_project;
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::str::FromStr;
use tracing::warn;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, clap::ValueEnum)]
pub enum AggregateStrategy {
/// Calculate the average of the data
#[default]
Average,
/// Calculate the median of the data
Median,
/// Calculate the mode of the data
Mode,
}

impl FromStr for AggregateStrategy {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self> {
match s {
"average" => Ok(AggregateStrategy::Average),
"median" => Ok(AggregateStrategy::Median),
"mode" => Ok(AggregateStrategy::Mode),
_ => Err(anyhow::anyhow!("Invalid aggregator strategy")),
}
}
}

impl fmt::Display for AggregateStrategy {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
AggregateStrategy::Average => write!(f, "average"),
AggregateStrategy::Median => write!(f, "median"),
AggregateStrategy::Mode => write!(f, "mode"),
}
}
}

impl AggregateStrategy {
pub fn aggregate(&self, data: Vec<OracleDecimalData>) -> OracleDecimalData {
match self {
AggregateStrategy::Average => {
let mut sum: U256 = U256::zero();
for d in data.iter() {
sum += d.value;
}
let avg = sum / U256::from(data.len() as u64);
OracleDecimalData {
value: avg,
decimal: data[0].decimal,
}
}
AggregateStrategy::Median => {
let mut sorted_data = data.clone();
sorted_data.sort_by(|a, b| a.value.cmp(&b.value));
let mid = sorted_data.len() / 2;
let median = if sorted_data.len() % 2 == 0 {
(sorted_data[mid].value + sorted_data[mid - 1].value) / U256::from(2u64)
} else {
sorted_data[mid].value
};
OracleDecimalData {
value: median,
decimal: data[0].decimal,
}
}
AggregateStrategy::Mode => {
let mut freq_map = std::collections::HashMap::new();
for d in data.iter() {
*freq_map.entry(d.value).or_insert(0) += 1;
}
let mode = freq_map.iter().max_by_key(|&(_, count)| count).unwrap().0;
OracleDecimalData {
value: *mode,
decimal: data[0].decimal,
}
}
}
}
}

#[pin_project]
pub struct AggregatorStream<S> {
#[pin]
inner: S,
strategy: AggregateStrategy,
buffer: VecDeque<OracleDecimalData>,
}

impl<S> AggregatorStream<S>
where
S: Stream<Item = Result<OracleDecimalData>>,
{
pub fn new(inner: S, strategy: AggregateStrategy) -> Self {
Self {
inner,
strategy,
buffer: VecDeque::with_capacity(100),
}
}
}

impl<S> Stream for AggregatorStream<S>
where
S: Stream<Item = Result<OracleDecimalData>>,
{
type Item = OracleDecimalData;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

while let Poll::Ready(Some(item)) = this.inner.as_mut().poll_next(cx) {
match item {
Ok(data) => {
this.buffer.push_back(data);
}
Err(e) => {
warn!("Error in stream: {}", e);
}
}
}

if !this.buffer.is_empty() {
let result = this.buffer.drain(..).collect();
Poll::Ready(Some(this.strategy.aggregate(result)))
} else {
Poll::Pending
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::stream::StreamExt;

#[test]
fn test_agg_strategy() {
let data = vec![
OracleDecimalData {
value: U256::from(100u64),
decimal: 2,
},
OracleDecimalData {
value: U256::from(200u64),
decimal: 2,
},
OracleDecimalData {
value: U256::from(300u64),
decimal: 2,
},
OracleDecimalData {
value: U256::from(400u64),
decimal: 2,
},
OracleDecimalData {
value: U256::from(500u64),
decimal: 2,
},
OracleDecimalData {
value: U256::from(100u64),
decimal: 2,
}, //two 100s
];

let avg = AggregateStrategy::Average.aggregate(data.clone());
assert_eq!(avg.value, U256::from(266u64));
assert_eq!(avg.decimal, 2);

let median = AggregateStrategy::Median.aggregate(data.clone());
assert_eq!(median.value, U256::from(250u64));
assert_eq!(median.decimal, 2);

let mode = AggregateStrategy::Mode.aggregate(data.clone());
assert_eq!(mode.value, U256::from(100u64));
assert_eq!(mode.decimal, 2);
}

#[tokio::test]
async fn test_agg_stream() {
let data_stream = futures::stream::iter(vec![
Ok(OracleDecimalData {
value: U256::from(100u64),
decimal: 2,
}),
Ok(OracleDecimalData {
value: U256::from(200u64),
decimal: 2,
}),
Ok(OracleDecimalData {
value: U256::from(300u64),
decimal: 2,
}),
Ok(OracleDecimalData {
value: U256::from(400u64),
decimal: 2,
}),
Ok(OracleDecimalData {
value: U256::from(500u64),
decimal: 2,
}),
]);
let mut agg_stream = AggregatorStream::new(data_stream, AggregateStrategy::Average);

let result = agg_stream.next().await;
assert_eq!(
result,
Some(OracleDecimalData {
value: U256::from(300u64),
decimal: 2
})
);
}
}
Loading

0 comments on commit 48ccca4

Please sign in to comment.