From e47e9216c5e1b65a879c2e4d6caa5997a0fe8d41 Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Mon, 23 Dec 2024 11:35:40 +0530 Subject: [PATCH] minor refactors --- crates/router/src/core/currency.rs | 26 +++------- crates/router/src/utils/currency.rs | 75 +++++++++-------------------- 2 files changed, 32 insertions(+), 69 deletions(-) diff --git a/crates/router/src/core/currency.rs b/crates/router/src/core/currency.rs index 912484b014a7..8c1a85128929 100644 --- a/crates/router/src/core/currency.rs +++ b/crates/router/src/core/currency.rs @@ -15,16 +15,11 @@ pub async fn retrieve_forex( ) -> CustomResult, ApiErrorResponse> { let forex_api = state.conf.forex_api.get_inner(); Ok(ApplicationResponse::Json( - get_forex_rates( - &state, - forex_api.call_delay, - forex_api.local_fetch_retry_delay, - forex_api.local_fetch_retry_count, - ) - .await - .change_context(ApiErrorResponse::GenericNotFoundError { - message: "Unable to fetch forex rates".to_string(), - })?, + get_forex_rates(&state, forex_api.call_delay) + .await + .change_context(ApiErrorResponse::GenericNotFoundError { + message: "Unable to fetch forex rates".to_string(), + })?, )) } @@ -53,14 +48,9 @@ pub async fn get_forex_exchange_rates( state: SessionState, ) -> CustomResult { let forex_api = state.conf.forex_api.get_inner(); - let rates = get_forex_rates( - &state, - forex_api.call_delay, - forex_api.local_fetch_retry_delay, - forex_api.local_fetch_retry_count, - ) - .await - .change_context(AnalyticsError::ForexFetchFailed)?; + let rates = get_forex_rates(&state, forex_api.call_delay) + .await + .change_context(AnalyticsError::ForexFetchFailed)?; Ok((*rates.data).clone()) } diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index f0e65adc3e0f..18f731273e3b 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc, time::Duration}; +use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc}; use api_models::enums; use common_utils::{date_time, errors::CustomResult, events::ApiEventMetric, ext_traits::AsyncExt}; @@ -10,7 +10,7 @@ use redis_interface::DelReply; use router_env::{instrument, tracing}; use rust_decimal::Decimal; use strum::IntoEnumIterator; -use tokio::{sync::RwLock, time::sleep}; +use tokio::sync::RwLock; use tracing_futures::Instrument; use crate::{ @@ -55,6 +55,8 @@ pub enum ForexCacheError { DefaultCurrencyParsingError, #[error("Entry not found in cache")] EntryNotFound, + #[error("Forex data unavailable")] + ForexDataUnavailable, #[error("Expiration time invalid")] InvalidLogExpiry, #[error("Error reading local")] @@ -120,32 +122,6 @@ async fn save_forex_to_local( Ok(()) } -// Alternative handler for handling the case, When no data in local as well as redis -#[allow(dead_code)] -async fn waited_fetch_and_update_caches( - state: &SessionState, - local_fetch_retry_delay: u64, - local_fetch_retry_count: u64, -) -> CustomResult { - for _n in 1..local_fetch_retry_count { - sleep(Duration::from_millis(local_fetch_retry_delay)).await; - //read from redis and update local plus break the loop and return - match retrieve_forex_from_redis(state).await { - Ok(Some(rates)) => { - save_forex_to_local(rates.clone()).await?; - return Ok(rates.clone()); - } - Ok(None) => continue, - Err(error) => { - logger::error!(?error); - continue; - } - } - } - //acquire lock one last time and try to fetch and update local & redis - successive_fetch_and_save_forex(state, None).await -} - impl TryFrom for ExchangeRates { type Error = error_stack::Report; fn try_from(value: DefaultExchangeRates) -> Result { @@ -179,8 +155,6 @@ impl From for CurrencyFactors { pub async fn get_forex_rates( state: &SessionState, call_delay: i64, - local_fetch_retry_delay: u64, - local_fetch_retry_count: u64, ) -> CustomResult { if let Some(local_rates) = retrieve_forex_from_local().await { if local_rates.is_expired(call_delay) { @@ -192,31 +166,26 @@ pub async fn get_forex_rates( } } else { // No data in local - handler_local_no_data( - state, - call_delay, - local_fetch_retry_delay, - local_fetch_retry_count, - ) - .await + handler_local_no_data(state, call_delay).await } } async fn handler_local_no_data( state: &SessionState, call_delay: i64, - _local_fetch_retry_delay: u64, - _local_fetch_retry_count: u64, ) -> CustomResult { match retrieve_forex_from_redis(state).await { Ok(Some(data)) => fallback_forex_redis_check(state, data, call_delay).await, Ok(None) => { // No data in local as well as redis - Ok(successive_fetch_and_save_forex(state, None).await?) + successive_fetch_and_save_forex(state, None).await?; + Err(ForexCacheError::ForexDataUnavailable.into()) } Err(error) => { + // Error in deriving forex rates from redis logger::error!(?error); - Ok(successive_fetch_and_save_forex(state, None).await?) + successive_fetch_and_save_forex(state, None).await?; + Err(ForexCacheError::ForexDataUnavailable.into()) } } } @@ -228,12 +197,14 @@ async fn successive_fetch_and_save_forex( // spawn a new thread and do the api fetch and write operations on redis. let stale_forex_data = stale_redis_data.clone(); let state = state.clone(); + println!(">>>>>>>>>>>>>>>>>>>>>>>code -1"); tokio::spawn( async move { acquire_redis_lock_and_fetch_data(&state, stale_redis_data).await; } .in_current_span(), ); + println!(">>>>>>>>>>>>>>>>>>>>>>>code 8"); stale_forex_data.ok_or(ForexCacheError::EntryNotFound.into()) } @@ -246,9 +217,13 @@ async fn acquire_redis_lock_and_fetch_data( if !lock_acquired { return stale_redis_data.ok_or(ForexCacheError::CouldNotAcquireLock.into()); } + println!(">>>>>>>>>>>>>>>>>>>>>>>code 0"); let api_rates = fetch_forex_rates(state).await; match api_rates { - Ok(rates) => successive_save_data_to_redis_local(state, rates).await, + Ok(rates) => { + println!(">>>>>>>>>>>>>>>>>>>>>>>code 1"); + successive_save_data_to_redis_local(state, rates).await + } Err(error) => { // API not able to fetch data call secondary service logger::error!(?error); @@ -265,6 +240,7 @@ async fn acquire_redis_lock_and_fetch_data( } } Err(error) => stale_redis_data.ok_or({ + println!(">>>>>>>>>>>>>>>>>>>>>>>code 2"); logger::error!(?error); ForexCacheError::ApiUnresponsive.into() }), @@ -304,6 +280,7 @@ async fn fallback_forex_redis_check( } None => { // redis expired + println!(">>>>>>>>>>>>>>>>redis expired 2"); successive_fetch_and_save_forex(state, Some(redis_data)).await } } @@ -326,6 +303,7 @@ async fn handler_local_expired( } None => { // Redis is expired going for API request + println!(">>>>>>>>>>>>>>>>redis expired 1"); successive_fetch_and_save_forex(state, Some(local_rates)).await } } @@ -498,7 +476,7 @@ async fn acquire_redis_lock(state: &SessionState) -> CustomResult CustomResult { let forex_api = state.conf.forex_api.get_inner(); - let rates = get_forex_rates( - &state, - forex_api.call_delay, - forex_api.local_fetch_retry_delay, - forex_api.local_fetch_retry_count, - ) - .await - .change_context(ForexCacheError::ApiError)?; + let rates = get_forex_rates(&state, forex_api.call_delay) + .await + .change_context(ForexCacheError::ApiError)?; let to_currency = enums::Currency::from_str(to_currency.as_str()) .change_context(ForexCacheError::CurrencyNotAcceptable)