diff --git a/bothan-core/src/manager/crypto_asset_info/manager.rs b/bothan-core/src/manager/crypto_asset_info/manager.rs index 04fc0d2..e2170ff 100644 --- a/bothan-core/src/manager/crypto_asset_info/manager.rs +++ b/bothan-core/src/manager/crypto_asset_info/manager.rs @@ -12,9 +12,8 @@ use crate::manager::crypto_asset_info::error::{ use crate::manager::crypto_asset_info::price::tasks::get_signal_price_states; use crate::manager::crypto_asset_info::signal_ids::set_workers_query_ids; use crate::manager::crypto_asset_info::types::{ - CryptoAssetManagerInfo, PriceSignalComputationRecords, PriceState, MONITORING_TTL, + CryptoAssetManagerInfo, PriceSignalComputationRecord, PriceState, MONITORING_TTL, }; -use crate::monitoring::records::SignalComputationRecords; use crate::monitoring::{create_uuid, Client as MonitoringClient}; use crate::registry::{Invalid, Registry}; use crate::store::error::Error as StoreError; @@ -29,7 +28,7 @@ pub struct CryptoAssetInfoManager<'a> { bothan_version: Version, registry_version_requirement: VersionReq, monitoring_client: Option>, - monitoring_cache: Option>>, + monitoring_cache: Option>>>, } impl<'a> CryptoAssetInfoManager<'a> { @@ -112,7 +111,7 @@ impl<'a> CryptoAssetInfoManager<'a> { let current_time = chrono::Utc::now().timestamp(); let stale_cutoff = current_time - self.stale_threshold; - let mut records = SignalComputationRecords::default(); + let mut records = Vec::new(); let price_states = get_signal_price_states(ids, &self.workers, ®istry, stale_cutoff, &mut records) @@ -130,7 +129,6 @@ impl<'a> CryptoAssetInfoManager<'a> { Ok((uuid, price_states)) } - // TODO: implement tx hash mapping into monitoring pub async fn push_monitoring_record( &self, uuid: String, diff --git a/bothan-core/src/manager/crypto_asset_info/price/tasks.rs b/bothan-core/src/manager/crypto_asset_info/price/tasks.rs index a2e7238..8f0be8b 100644 --- a/bothan-core/src/manager/crypto_asset_info/price/tasks.rs +++ b/bothan-core/src/manager/crypto_asset_info/price/tasks.rs @@ -7,7 +7,7 @@ use tracing::{debug, info, warn}; use crate::manager::crypto_asset_info::price::cache::PriceCache; use crate::manager::crypto_asset_info::price::error::{Error, MissingPrerequisiteError}; use crate::manager::crypto_asset_info::types::{ - PriceSignalComputationRecord, PriceSignalComputationRecords, PriceState, WorkerMap, + PriceSignalComputationRecord, PriceState, WorkerMap, }; use crate::monitoring::records::{OperationRecord, SignalComputationRecord, SourceRecord}; use crate::registry::post_processor::PostProcess; @@ -23,7 +23,7 @@ pub async fn get_signal_price_states<'a>( workers: &WorkerMap<'a>, registry: &Registry, stale_cutoff: i64, - records: &mut PriceSignalComputationRecords, + records: &mut Vec, ) -> Vec { let mut cache = PriceCache::new(); @@ -76,16 +76,18 @@ async fn compute_signal_result<'a>( registry: &Registry, stale_cutoff: i64, cache: &PriceCache, - records: &mut PriceSignalComputationRecords, + records: &mut Vec, ) -> Result { match registry.get(id) { Some(signal) => { - let mut record = SignalComputationRecord::default(); + let mut record = SignalComputationRecord::new(id.to_string()); let source_results = compute_source_result(signal, workers, cache, stale_cutoff, &mut record).await?; - let record_ref = records.push(id.to_string(), record); + records.push(record); + // We can unwrap here because we just pushed the record, so it's guaranteed to be there + let record_ref = records.last_mut().unwrap(); let process_signal_result = signal.processor.process(source_results); record_ref.process_result = Some(process_signal_result.clone()); @@ -149,19 +151,18 @@ async fn process_source_query<'a>( source_query: &SourceQuery, stale_cutoff: i64, cache: &PriceCache, - source_records: &mut Vec<(String, SourceRecord)>, + source_records: &mut Vec>, ) -> Result, MissingPrerequisiteError> { let source_id = &source_query.source_id; let query_id = &source_query.query_id; match worker.get_asset(query_id).await { Ok(AssetState::Available(a)) if a.timestamp.ge(&stale_cutoff) => { // Create a record for the specific source - source_records.push(( - source_id.clone(), - SourceRecord::new(query_id.clone(), a.price, vec![], None), - )); + let source_record = + SourceRecord::new(source_id.clone(), query_id.clone(), a.price, vec![], None); + source_records.push(source_record); // We can unwrap here because we just pushed the value, so it's guaranteed to be there - let (_, record) = source_records.last_mut().unwrap(); + let record = source_records.last_mut().unwrap(); // Calculate the source route compute_source_routes(&source_query.routes, a.price, cache, record) @@ -304,7 +305,7 @@ mod tests { let registry = valid_mock_registry().validate().unwrap(); let stale_cutoff = 0; - let mut records = PriceSignalComputationRecords::default(); + let mut records = Vec::new(); let res = get_signal_price_states(ids, &workers, ®istry, stale_cutoff, &mut records).await; @@ -333,7 +334,7 @@ mod tests { let registry = valid_mock_registry().validate().unwrap(); let stale_cutoff = 0; - let mut records = PriceSignalComputationRecords::default(); + let mut records = Vec::new(); let res = get_signal_price_states(ids, &workers, ®istry, stale_cutoff, &mut records).await; @@ -381,7 +382,7 @@ mod tests { let registry = valid_mock_registry().validate().unwrap(); let stale_cutoff = 10000; - let mut records = PriceSignalComputationRecords::default(); + let mut records = Vec::new(); let res = get_signal_price_states(ids, &workers, ®istry, stale_cutoff, &mut records).await; @@ -414,15 +415,19 @@ mod tests { let cache = PriceCache::new(); let stale_cutoff = 0; - let mut record = PriceSignalComputationRecord::default(); + let mut record = PriceSignalComputationRecord::new("test".to_string()); let res = compute_source_result(&signal, &workers, &cache, stale_cutoff, &mut record).await; let expected_res = Ok(vec![("test-source".to_string(), Decimal::default())]); let expected_record = SignalComputationRecord { - sources: vec![( + signal_id: "test".to_string(), + sources: vec![SourceRecord::new( "test-source".to_string(), - SourceRecord::new("testusd".to_string(), Decimal::default(), vec![], None), + "testusd".to_string(), + Decimal::default(), + vec![], + None, )], process_result: None, post_process_result: None, @@ -454,7 +459,7 @@ mod tests { let cache = PriceCache::new(); let stale_cutoff = 0; - let mut record = PriceSignalComputationRecord::default(); + let mut record = PriceSignalComputationRecord::new("test".to_string()); let expected_record = record.clone(); let res = compute_source_result(&signal, &workers, &cache, stale_cutoff, &mut record).await; @@ -482,9 +487,12 @@ mod tests { .await; let expected_res = Ok(Some(("test-source".to_string(), Decimal::new(1000, 0)))); - let expected_source_records = vec![( + let expected_source_records = vec![SourceRecord::new( "test-source".to_string(), - SourceRecord::new("testusd".to_string(), Decimal::new(1000, 0), vec![], None), + "testusd".to_string(), + Decimal::new(1000, 0), + vec![], + None, )]; assert_eq!(res, expected_res); assert_eq!(source_records, &expected_source_records); @@ -544,11 +552,18 @@ mod tests { cache.set_available("C".to_string(), Decimal::from(13)); cache.set_available("D".to_string(), Decimal::from(89)); - let mut record = SourceRecord::new("test".to_string(), Decimal::one(), vec![], None); + let mut record = SourceRecord::new( + "test-source".to_string(), + "test".to_string(), + Decimal::one(), + vec![], + None, + ); let res = compute_source_routes(&routes, start, &cache, &mut record); let expected_value = Some(Decimal::from_str_exact("76.4").unwrap()); let expected_record = SourceRecord::new( + "test-source".to_string(), "test".to_string(), Decimal::one(), vec![ @@ -574,7 +589,13 @@ mod tests { let mut cache = PriceCache::new(); cache.set_available("A".to_string(), Decimal::from(2)); - let mut record = SourceRecord::new("test".to_string(), Decimal::one(), vec![], None); + let mut record = SourceRecord::new( + "test-source".to_string(), + "test".to_string(), + Decimal::one(), + vec![], + None, + ); let expected_record = record.clone(); let res = compute_source_routes(&routes, start, &cache, &mut record); @@ -596,7 +617,13 @@ mod tests { cache.set_available("A".to_string(), Decimal::from(1337)); cache.set_unavailable("B".to_string()); - let mut record = SourceRecord::new("test".to_string(), Decimal::one(), vec![], None); + let mut record = SourceRecord::new( + "test-source".to_string(), + "test".to_string(), + Decimal::one(), + vec![], + None, + ); let expected_record = record.clone(); let res = compute_source_routes(&routes, start, &cache, &mut record); @@ -618,7 +645,13 @@ mod tests { cache.set_unsupported("B".to_string()); cache.set_available("C".to_string(), Decimal::from(10000)); - let mut record = SourceRecord::new("test".to_string(), Decimal::one(), vec![], None); + let mut record = SourceRecord::new( + "test-source".to_string(), + "test".to_string(), + Decimal::one(), + vec![], + None, + ); let expected_record = record.clone(); let res = compute_source_routes(&routes, start, &cache, &mut record); diff --git a/bothan-core/src/manager/crypto_asset_info/types.rs b/bothan-core/src/manager/crypto_asset_info/types.rs index c1fbf3f..47e786d 100644 --- a/bothan-core/src/manager/crypto_asset_info/types.rs +++ b/bothan-core/src/manager/crypto_asset_info/types.rs @@ -4,14 +4,13 @@ use std::time::Duration; use rust_decimal::Decimal; -use crate::monitoring::records::{SignalComputationRecord, SignalComputationRecords}; +use crate::monitoring::records::SignalComputationRecord; use crate::worker::AssetWorker; pub const MONITORING_TTL: Duration = Duration::from_secs(60); pub const HEARTBEAT: Duration = Duration::from_secs(60); pub type WorkerMap<'a> = HashMap>; -pub type PriceSignalComputationRecords = SignalComputationRecords; pub type PriceSignalComputationRecord = SignalComputationRecord; #[derive(Debug, Clone, PartialEq)] diff --git a/bothan-core/src/monitoring/client.rs b/bothan-core/src/monitoring/client.rs index e14dd3e..a27727f 100644 --- a/bothan-core/src/monitoring/client.rs +++ b/bothan-core/src/monitoring/client.rs @@ -6,7 +6,7 @@ use semver::Version; use serde::Serialize; use crate::monitoring::error::Error; -use crate::monitoring::records::{SignalComputationRecords, SignalRecordsWithTxHash}; +use crate::monitoring::records::{SignalComputationRecord, SignalRecordsWithTxHash}; use crate::monitoring::signer::Signer; use crate::monitoring::types::{BothanInfo, Entry, Topic}; @@ -29,7 +29,7 @@ impl Client { &self, uuid: String, tx_hash: String, - records: Arc>, + records: Arc>>, ) -> Result where T: Serialize + Sized, diff --git a/bothan-core/src/monitoring/records.rs b/bothan-core/src/monitoring/records.rs index 41b2dc5..82e9c99 100644 --- a/bothan-core/src/monitoring/records.rs +++ b/bothan-core/src/monitoring/records.rs @@ -1,7 +1,7 @@ +use std::sync::Arc; + use rust_decimal::Decimal; -use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use std::sync::Arc; use crate::registry::post_processor::PostProcessError; use crate::registry::processor::ProcessError; @@ -10,45 +10,35 @@ use crate::registry::source::Operation; #[derive(Debug, Serialize, Deserialize)] pub struct SignalRecordsWithTxHash { pub tx_hash: String, - pub records: Arc>, -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct SignalComputationRecords { - #[serde(flatten)] - inner: Vec<(String, SignalComputationRecord)>, + pub records: Arc>>, } -impl SignalComputationRecords -where - T: Serialize + DeserializeOwned, - U: Serialize + DeserializeOwned, -{ - pub fn push( - &mut self, - id: String, - value: SignalComputationRecord, - ) -> &mut SignalComputationRecord { - self.inner.push((id, value)); - // We can unwrap here because we just pushed the value so it's guaranteed to be there - let (_, value) = self.inner.last_mut().unwrap(); - value - } -} - -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct SignalComputationRecord where T: Sized, U: Sized, { - pub sources: Vec<(String, SourceRecord)>, + pub signal_id: String, + pub sources: Vec>, pub process_result: Option>, pub post_process_result: Option>, } +impl SignalComputationRecord { + pub(crate) fn new(signal_id: String) -> Self { + SignalComputationRecord { + signal_id, + sources: Vec::new(), + process_result: None, + post_process_result: None, + } + } +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct SourceRecord { + pub source_id: String, pub query_id: String, pub raw_source_value: T, pub operations: Vec, @@ -57,12 +47,14 @@ pub struct SourceRecord { impl SourceRecord { pub fn new( + source_id: String, query_id: String, raw_source_value: T, operations: Vec, final_value: Option, ) -> Self { SourceRecord { + source_id, query_id, raw_source_value, operations,