Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fix Serialize Bug #96

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions bothan-core/src/manager/crypto_asset_info/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ use crate::manager::crypto_asset_info::error::{
use crate::manager::crypto_asset_info::price::tasks::get_signal_price_states;
use crate::manager::crypto_asset_info::signal_ids::set_workers_query_ids;
use crate::manager::crypto_asset_info::types::{
CryptoAssetManagerInfo, PriceSignalComputationRecords, PriceState, MONITORING_TTL,
CryptoAssetManagerInfo, PriceSignalComputationRecord, PriceState, MONITORING_TTL,
};
use crate::monitoring::records::SignalComputationRecords;
use crate::monitoring::{create_uuid, Client as MonitoringClient};
use crate::registry::{Invalid, Registry};
use crate::store::error::Error as StoreError;
Expand All @@ -29,7 +28,7 @@ pub struct CryptoAssetInfoManager<'a> {
bothan_version: Version,
registry_version_requirement: VersionReq,
monitoring_client: Option<Arc<MonitoringClient>>,
monitoring_cache: Option<Cache<String, Arc<PriceSignalComputationRecords>>>,
monitoring_cache: Option<Cache<String, Arc<Vec<PriceSignalComputationRecord>>>>,
}

impl<'a> CryptoAssetInfoManager<'a> {
Expand Down Expand Up @@ -112,7 +111,7 @@ impl<'a> CryptoAssetInfoManager<'a> {
let current_time = chrono::Utc::now().timestamp();
let stale_cutoff = current_time - self.stale_threshold;

let mut records = SignalComputationRecords::default();
let mut records = Vec::new();

let price_states =
get_signal_price_states(ids, &self.workers, &registry, stale_cutoff, &mut records)
Expand All @@ -130,7 +129,6 @@ impl<'a> CryptoAssetInfoManager<'a> {
Ok((uuid, price_states))
}

// TODO: implement tx hash mapping into monitoring
pub async fn push_monitoring_record(
&self,
uuid: String,
Expand Down
81 changes: 57 additions & 24 deletions bothan-core/src/manager/crypto_asset_info/price/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing::{debug, info, warn};
use crate::manager::crypto_asset_info::price::cache::PriceCache;
use crate::manager::crypto_asset_info::price::error::{Error, MissingPrerequisiteError};
use crate::manager::crypto_asset_info::types::{
PriceSignalComputationRecord, PriceSignalComputationRecords, PriceState, WorkerMap,
PriceSignalComputationRecord, PriceState, WorkerMap,
};
use crate::monitoring::records::{OperationRecord, SignalComputationRecord, SourceRecord};
use crate::registry::post_processor::PostProcess;
Expand All @@ -23,7 +23,7 @@ pub async fn get_signal_price_states<'a>(
workers: &WorkerMap<'a>,
registry: &Registry<Valid>,
stale_cutoff: i64,
records: &mut PriceSignalComputationRecords,
records: &mut Vec<PriceSignalComputationRecord>,
) -> Vec<PriceState> {
let mut cache = PriceCache::new();

Expand Down Expand Up @@ -76,16 +76,18 @@ async fn compute_signal_result<'a>(
registry: &Registry<Valid>,
stale_cutoff: i64,
cache: &PriceCache<String>,
records: &mut PriceSignalComputationRecords,
records: &mut Vec<PriceSignalComputationRecord>,
) -> Result<Decimal, Error> {
match registry.get(id) {
Some(signal) => {
let mut record = SignalComputationRecord::default();
let mut record = SignalComputationRecord::new(id.to_string());

let source_results =
compute_source_result(signal, workers, cache, stale_cutoff, &mut record).await?;

let record_ref = records.push(id.to_string(), record);
records.push(record);
// We can unwrap here because we just pushed the record, so it's guaranteed to be there
let record_ref = records.last_mut().unwrap();

let process_signal_result = signal.processor.process(source_results);
record_ref.process_result = Some(process_signal_result.clone());
Expand Down Expand Up @@ -149,19 +151,18 @@ async fn process_source_query<'a>(
source_query: &SourceQuery,
stale_cutoff: i64,
cache: &PriceCache<String>,
source_records: &mut Vec<(String, SourceRecord<Decimal>)>,
source_records: &mut Vec<SourceRecord<Decimal>>,
) -> Result<Option<(String, Decimal)>, MissingPrerequisiteError> {
let source_id = &source_query.source_id;
let query_id = &source_query.query_id;
match worker.get_asset(query_id).await {
Ok(AssetState::Available(a)) if a.timestamp.ge(&stale_cutoff) => {
// Create a record for the specific source
source_records.push((
source_id.clone(),
SourceRecord::new(query_id.clone(), a.price, vec![], None),
));
let source_record =
SourceRecord::new(source_id.clone(), query_id.clone(), a.price, vec![], None);
source_records.push(source_record);
// We can unwrap here because we just pushed the value, so it's guaranteed to be there
let (_, record) = source_records.last_mut().unwrap();
let record = source_records.last_mut().unwrap();

// Calculate the source route
compute_source_routes(&source_query.routes, a.price, cache, record)
Expand Down Expand Up @@ -304,7 +305,7 @@ mod tests {

let registry = valid_mock_registry().validate().unwrap();
let stale_cutoff = 0;
let mut records = PriceSignalComputationRecords::default();
let mut records = Vec::new();

let res =
get_signal_price_states(ids, &workers, &registry, stale_cutoff, &mut records).await;
Expand Down Expand Up @@ -333,7 +334,7 @@ mod tests {

let registry = valid_mock_registry().validate().unwrap();
let stale_cutoff = 0;
let mut records = PriceSignalComputationRecords::default();
let mut records = Vec::new();

let res =
get_signal_price_states(ids, &workers, &registry, stale_cutoff, &mut records).await;
Expand Down Expand Up @@ -381,7 +382,7 @@ mod tests {

let registry = valid_mock_registry().validate().unwrap();
let stale_cutoff = 10000;
let mut records = PriceSignalComputationRecords::default();
let mut records = Vec::new();

let res =
get_signal_price_states(ids, &workers, &registry, stale_cutoff, &mut records).await;
Expand Down Expand Up @@ -414,15 +415,19 @@ mod tests {

let cache = PriceCache::new();
let stale_cutoff = 0;
let mut record = PriceSignalComputationRecord::default();
let mut record = PriceSignalComputationRecord::new("test".to_string());

let res = compute_source_result(&signal, &workers, &cache, stale_cutoff, &mut record).await;

let expected_res = Ok(vec![("test-source".to_string(), Decimal::default())]);
let expected_record = SignalComputationRecord {
sources: vec![(
signal_id: "test".to_string(),
sources: vec![SourceRecord::new(
"test-source".to_string(),
SourceRecord::new("testusd".to_string(), Decimal::default(), vec![], None),
"testusd".to_string(),
Decimal::default(),
vec![],
None,
)],
process_result: None,
post_process_result: None,
Expand Down Expand Up @@ -454,7 +459,7 @@ mod tests {

let cache = PriceCache::new();
let stale_cutoff = 0;
let mut record = PriceSignalComputationRecord::default();
let mut record = PriceSignalComputationRecord::new("test".to_string());
let expected_record = record.clone();

let res = compute_source_result(&signal, &workers, &cache, stale_cutoff, &mut record).await;
Expand Down Expand Up @@ -482,9 +487,12 @@ mod tests {
.await;

let expected_res = Ok(Some(("test-source".to_string(), Decimal::new(1000, 0))));
let expected_source_records = vec![(
let expected_source_records = vec![SourceRecord::new(
"test-source".to_string(),
SourceRecord::new("testusd".to_string(), Decimal::new(1000, 0), vec![], None),
"testusd".to_string(),
Decimal::new(1000, 0),
vec![],
None,
)];
assert_eq!(res, expected_res);
assert_eq!(source_records, &expected_source_records);
Expand Down Expand Up @@ -544,11 +552,18 @@ mod tests {
cache.set_available("C".to_string(), Decimal::from(13));
cache.set_available("D".to_string(), Decimal::from(89));

let mut record = SourceRecord::new("test".to_string(), Decimal::one(), vec![], None);
let mut record = SourceRecord::new(
"test-source".to_string(),
"test".to_string(),
Decimal::one(),
vec![],
None,
);
let res = compute_source_routes(&routes, start, &cache, &mut record);

let expected_value = Some(Decimal::from_str_exact("76.4").unwrap());
let expected_record = SourceRecord::new(
"test-source".to_string(),
"test".to_string(),
Decimal::one(),
vec![
Expand All @@ -574,7 +589,13 @@ mod tests {
let mut cache = PriceCache::new();
cache.set_available("A".to_string(), Decimal::from(2));

let mut record = SourceRecord::new("test".to_string(), Decimal::one(), vec![], None);
let mut record = SourceRecord::new(
"test-source".to_string(),
"test".to_string(),
Decimal::one(),
vec![],
None,
);
let expected_record = record.clone();
let res = compute_source_routes(&routes, start, &cache, &mut record);

Expand All @@ -596,7 +617,13 @@ mod tests {
cache.set_available("A".to_string(), Decimal::from(1337));
cache.set_unavailable("B".to_string());

let mut record = SourceRecord::new("test".to_string(), Decimal::one(), vec![], None);
let mut record = SourceRecord::new(
"test-source".to_string(),
"test".to_string(),
Decimal::one(),
vec![],
None,
);
let expected_record = record.clone();
let res = compute_source_routes(&routes, start, &cache, &mut record);

Expand All @@ -618,7 +645,13 @@ mod tests {
cache.set_unsupported("B".to_string());
cache.set_available("C".to_string(), Decimal::from(10000));

let mut record = SourceRecord::new("test".to_string(), Decimal::one(), vec![], None);
let mut record = SourceRecord::new(
"test-source".to_string(),
"test".to_string(),
Decimal::one(),
vec![],
None,
);
let expected_record = record.clone();
let res = compute_source_routes(&routes, start, &cache, &mut record);

Expand Down
3 changes: 1 addition & 2 deletions bothan-core/src/manager/crypto_asset_info/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ use std::time::Duration;

use rust_decimal::Decimal;

use crate::monitoring::records::{SignalComputationRecord, SignalComputationRecords};
use crate::monitoring::records::SignalComputationRecord;
use crate::worker::AssetWorker;

pub const MONITORING_TTL: Duration = Duration::from_secs(60);
pub const HEARTBEAT: Duration = Duration::from_secs(60);

pub type WorkerMap<'a> = HashMap<String, Arc<dyn AssetWorker + 'a>>;
pub type PriceSignalComputationRecords = SignalComputationRecords<Decimal, Decimal>;
pub type PriceSignalComputationRecord = SignalComputationRecord<Decimal, Decimal>;

#[derive(Debug, Clone, PartialEq)]
Expand Down
4 changes: 2 additions & 2 deletions bothan-core/src/monitoring/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use semver::Version;
use serde::Serialize;

use crate::monitoring::error::Error;
use crate::monitoring::records::{SignalComputationRecords, SignalRecordsWithTxHash};
use crate::monitoring::records::{SignalComputationRecord, SignalRecordsWithTxHash};
use crate::monitoring::signer::Signer;
use crate::monitoring::types::{BothanInfo, Entry, Topic};

Expand All @@ -29,7 +29,7 @@ impl Client {
&self,
uuid: String,
tx_hash: String,
records: Arc<SignalComputationRecords<T, U>>,
records: Arc<Vec<SignalComputationRecord<T, U>>>,
) -> Result<Response, Error>
where
T: Serialize + Sized,
Expand Down
48 changes: 20 additions & 28 deletions bothan-core/src/monitoring/records.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use rust_decimal::Decimal;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

use crate::registry::post_processor::PostProcessError;
use crate::registry::processor::ProcessError;
Expand All @@ -10,45 +10,35 @@ use crate::registry::source::Operation;
#[derive(Debug, Serialize, Deserialize)]
pub struct SignalRecordsWithTxHash<T, U> {
pub tx_hash: String,
pub records: Arc<SignalComputationRecords<T, U>>,
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct SignalComputationRecords<T, U> {
#[serde(flatten)]
inner: Vec<(String, SignalComputationRecord<T, U>)>,
pub records: Arc<Vec<SignalComputationRecord<T, U>>>,
}

impl<T, U> SignalComputationRecords<T, U>
where
T: Serialize + DeserializeOwned,
U: Serialize + DeserializeOwned,
{
pub fn push(
&mut self,
id: String,
value: SignalComputationRecord<T, U>,
) -> &mut SignalComputationRecord<T, U> {
self.inner.push((id, value));
// We can unwrap here because we just pushed the value so it's guaranteed to be there
let (_, value) = self.inner.last_mut().unwrap();
value
}
}

#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SignalComputationRecord<T, U>
where
T: Sized,
U: Sized,
{
pub sources: Vec<(String, SourceRecord<T>)>,
pub signal_id: String,
pub sources: Vec<SourceRecord<T>>,
pub process_result: Option<Result<U, ProcessError>>,
pub post_process_result: Option<Result<U, PostProcessError>>,
}

impl<T, U> SignalComputationRecord<T, U> {
pub(crate) fn new(signal_id: String) -> Self {
SignalComputationRecord {
signal_id,
sources: Vec::new(),
process_result: None,
post_process_result: None,
}
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SourceRecord<T: Sized> {
pub source_id: String,
pub query_id: String,
pub raw_source_value: T,
pub operations: Vec<OperationRecord>,
Expand All @@ -57,12 +47,14 @@ pub struct SourceRecord<T: Sized> {

impl<T> SourceRecord<T> {
pub fn new(
source_id: String,
query_id: String,
raw_source_value: T,
operations: Vec<OperationRecord>,
final_value: Option<T>,
) -> Self {
SourceRecord {
source_id,
query_id,
raw_source_value,
operations,
Expand Down
Loading