Skip to content

Commit

Permalink
minor refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
prajjwalkumar17 committed Dec 23, 2024
1 parent 140853d commit e47e921
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 69 deletions.
26 changes: 8 additions & 18 deletions crates/router/src/core/currency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@ pub async fn retrieve_forex(
) -> CustomResult<ApplicationResponse<currency::FxExchangeRatesCacheEntry>, ApiErrorResponse> {
let forex_api = state.conf.forex_api.get_inner();
Ok(ApplicationResponse::Json(
get_forex_rates(
&state,
forex_api.call_delay,
forex_api.local_fetch_retry_delay,
forex_api.local_fetch_retry_count,
)
.await
.change_context(ApiErrorResponse::GenericNotFoundError {
message: "Unable to fetch forex rates".to_string(),
})?,
get_forex_rates(&state, forex_api.call_delay)
.await
.change_context(ApiErrorResponse::GenericNotFoundError {
message: "Unable to fetch forex rates".to_string(),
})?,
))
}

Expand Down Expand Up @@ -53,14 +48,9 @@ pub async fn get_forex_exchange_rates(
state: SessionState,
) -> CustomResult<ExchangeRates, AnalyticsError> {
let forex_api = state.conf.forex_api.get_inner();
let rates = get_forex_rates(
&state,
forex_api.call_delay,
forex_api.local_fetch_retry_delay,
forex_api.local_fetch_retry_count,
)
.await
.change_context(AnalyticsError::ForexFetchFailed)?;
let rates = get_forex_rates(&state, forex_api.call_delay)
.await
.change_context(AnalyticsError::ForexFetchFailed)?;

Ok((*rates.data).clone())
}
75 changes: 24 additions & 51 deletions crates/router/src/utils/currency.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc, time::Duration};
use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc};

use api_models::enums;
use common_utils::{date_time, errors::CustomResult, events::ApiEventMetric, ext_traits::AsyncExt};
Expand All @@ -10,7 +10,7 @@ use redis_interface::DelReply;
use router_env::{instrument, tracing};
use rust_decimal::Decimal;
use strum::IntoEnumIterator;
use tokio::{sync::RwLock, time::sleep};
use tokio::sync::RwLock;
use tracing_futures::Instrument;

use crate::{
Expand Down Expand Up @@ -55,6 +55,8 @@ pub enum ForexCacheError {
DefaultCurrencyParsingError,
#[error("Entry not found in cache")]
EntryNotFound,
#[error("Forex data unavailable")]
ForexDataUnavailable,
#[error("Expiration time invalid")]
InvalidLogExpiry,
#[error("Error reading local")]
Expand Down Expand Up @@ -120,32 +122,6 @@ async fn save_forex_to_local(
Ok(())
}

// Alternative handler for handling the case, When no data in local as well as redis
#[allow(dead_code)]
async fn waited_fetch_and_update_caches(
state: &SessionState,
local_fetch_retry_delay: u64,
local_fetch_retry_count: u64,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
for _n in 1..local_fetch_retry_count {
sleep(Duration::from_millis(local_fetch_retry_delay)).await;
//read from redis and update local plus break the loop and return
match retrieve_forex_from_redis(state).await {
Ok(Some(rates)) => {
save_forex_to_local(rates.clone()).await?;
return Ok(rates.clone());
}
Ok(None) => continue,
Err(error) => {
logger::error!(?error);
continue;
}
}
}
//acquire lock one last time and try to fetch and update local & redis
successive_fetch_and_save_forex(state, None).await
}

impl TryFrom<DefaultExchangeRates> for ExchangeRates {
type Error = error_stack::Report<ForexCacheError>;
fn try_from(value: DefaultExchangeRates) -> Result<Self, Self::Error> {
Expand Down Expand Up @@ -179,8 +155,6 @@ impl From<Conversion> for CurrencyFactors {
pub async fn get_forex_rates(
state: &SessionState,
call_delay: i64,
local_fetch_retry_delay: u64,
local_fetch_retry_count: u64,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
if let Some(local_rates) = retrieve_forex_from_local().await {
if local_rates.is_expired(call_delay) {
Expand All @@ -192,31 +166,26 @@ pub async fn get_forex_rates(
}
} else {
// No data in local
handler_local_no_data(
state,
call_delay,
local_fetch_retry_delay,
local_fetch_retry_count,
)
.await
handler_local_no_data(state, call_delay).await
}
}

async fn handler_local_no_data(
state: &SessionState,
call_delay: i64,
_local_fetch_retry_delay: u64,
_local_fetch_retry_count: u64,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
match retrieve_forex_from_redis(state).await {
Ok(Some(data)) => fallback_forex_redis_check(state, data, call_delay).await,
Ok(None) => {
// No data in local as well as redis
Ok(successive_fetch_and_save_forex(state, None).await?)
successive_fetch_and_save_forex(state, None).await?;
Err(ForexCacheError::ForexDataUnavailable.into())
}
Err(error) => {
// Error in deriving forex rates from redis
logger::error!(?error);
Ok(successive_fetch_and_save_forex(state, None).await?)
successive_fetch_and_save_forex(state, None).await?;
Err(ForexCacheError::ForexDataUnavailable.into())
}
}
}
Expand All @@ -228,12 +197,14 @@ async fn successive_fetch_and_save_forex(
// spawn a new thread and do the api fetch and write operations on redis.
let stale_forex_data = stale_redis_data.clone();
let state = state.clone();
println!(">>>>>>>>>>>>>>>>>>>>>>>code -1");
tokio::spawn(
async move {
acquire_redis_lock_and_fetch_data(&state, stale_redis_data).await;
}
.in_current_span(),
);
println!(">>>>>>>>>>>>>>>>>>>>>>>code 8");
stale_forex_data.ok_or(ForexCacheError::EntryNotFound.into())
}

Expand All @@ -246,9 +217,13 @@ async fn acquire_redis_lock_and_fetch_data(
if !lock_acquired {
return stale_redis_data.ok_or(ForexCacheError::CouldNotAcquireLock.into());
}
println!(">>>>>>>>>>>>>>>>>>>>>>>code 0");
let api_rates = fetch_forex_rates(state).await;
match api_rates {
Ok(rates) => successive_save_data_to_redis_local(state, rates).await,
Ok(rates) => {
println!(">>>>>>>>>>>>>>>>>>>>>>>code 1");
successive_save_data_to_redis_local(state, rates).await
}
Err(error) => {
// API not able to fetch data call secondary service
logger::error!(?error);
Expand All @@ -265,6 +240,7 @@ async fn acquire_redis_lock_and_fetch_data(
}
}
Err(error) => stale_redis_data.ok_or({
println!(">>>>>>>>>>>>>>>>>>>>>>>code 2");
logger::error!(?error);
ForexCacheError::ApiUnresponsive.into()
}),
Expand Down Expand Up @@ -304,6 +280,7 @@ async fn fallback_forex_redis_check(
}
None => {
// redis expired
println!(">>>>>>>>>>>>>>>>redis expired 2");
successive_fetch_and_save_forex(state, Some(redis_data)).await
}
}
Expand All @@ -326,6 +303,7 @@ async fn handler_local_expired(
}
None => {
// Redis is expired going for API request
println!(">>>>>>>>>>>>>>>>redis expired 1");
successive_fetch_and_save_forex(state, Some(local_rates)).await
}
}
Expand Down Expand Up @@ -498,7 +476,7 @@ async fn acquire_redis_lock(state: &SessionState) -> CustomResult<bool, ForexCac
REDIX_FOREX_CACHE_KEY,
"",
Some(
i64::try_from(forex_api.local_fetch_retry_delay)
i64::try_from(forex_api.redis_lock_timeout)
.change_context(ForexCacheError::ConversionError)?,
),
)
Expand Down Expand Up @@ -556,14 +534,9 @@ pub async fn convert_currency(
from_currency: String,
) -> CustomResult<api_models::currency::CurrencyConversionResponse, ForexCacheError> {
let forex_api = state.conf.forex_api.get_inner();
let rates = get_forex_rates(
&state,
forex_api.call_delay,
forex_api.local_fetch_retry_delay,
forex_api.local_fetch_retry_count,
)
.await
.change_context(ForexCacheError::ApiError)?;
let rates = get_forex_rates(&state, forex_api.call_delay)
.await
.change_context(ForexCacheError::ApiError)?;

let to_currency = enums::Currency::from_str(to_currency.as_str())
.change_context(ForexCacheError::CurrencyNotAcceptable)
Expand Down

0 comments on commit e47e921

Please sign in to comment.