Skip to content

Commit

Permalink
feat(analytics): implement currency conversion to power multi-currenc…
Browse files Browse the repository at this point in the history
…y aggregation (#6418)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and Sayak Bhattacharya committed Nov 26, 2024
1 parent 103c1a3 commit df9169b
Show file tree
Hide file tree
Showing 18 changed files with 273 additions and 24 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/analytics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ hyperswitch_interfaces = { version = "0.1.0", path = "../hyperswitch_interfaces"
masking = { version = "0.1.0", path = "../masking" }
router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] }
storage_impl = { version = "0.1.0", path = "../storage_impl", default-features = false }
currency_conversion = { version = "0.1.0", path = "../currency_conversion" }

#Third Party dependencies
actix-web = "4.5.1"
Expand All @@ -34,6 +35,7 @@ futures = "0.3.30"
once_cell = "1.19.0"
opensearch = { version = "2.2.0", features = ["aws-auth"] }
reqwest = { version = "0.11.27", features = ["serde_json"] }
rust_decimal = "1.35"
serde = { version = "1.0.197", features = ["derive", "rc"] }
serde_json = "1.0.115"
sqlx = { version = "0.8.2", features = ["postgres", "runtime-tokio", "runtime-tokio-native-tls", "time", "bigdecimal"] }
Expand Down
8 changes: 8 additions & 0 deletions crates/analytics/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub enum AnalyticsError {
UnknownError,
#[error("Access Forbidden Analytics Error")]
AccessForbiddenError,
#[error("Failed to fetch currency exchange rate")]
ForexFetchFailed,
}

impl ErrorSwitch<ApiErrorResponse> for AnalyticsError {
Expand All @@ -32,6 +34,12 @@ impl ErrorSwitch<ApiErrorResponse> for AnalyticsError {
Self::AccessForbiddenError => {
ApiErrorResponse::Unauthorized(ApiError::new("IR", 0, "Access Forbidden", None))
}
Self::ForexFetchFailed => ApiErrorResponse::InternalServerError(ApiError::new(
"HE",
0,
"Failed to fetch currency exchange rate",
None,
)),
}
}
}
29 changes: 24 additions & 5 deletions crates/analytics/src/payment_intents/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl PaymentIntentMetricAccumulator for CountAccumulator {
}

impl PaymentIntentMetricAccumulator for SmartRetriedAmountAccumulator {
type MetricOutput = (Option<u64>, Option<u64>);
type MetricOutput = (Option<u64>, Option<u64>, Option<u64>, Option<u64>);
#[inline]
fn add_metrics_bucket(&mut self, metrics: &PaymentIntentMetricRow) {
self.amount = match (
Expand Down Expand Up @@ -117,7 +117,7 @@ impl PaymentIntentMetricAccumulator for SmartRetriedAmountAccumulator {
.amount_without_retries
.and_then(|i| u64::try_from(i).ok())
.or(Some(0));
(with_retries, without_retries)
(with_retries, without_retries, Some(0), Some(0))
}
}

Expand Down Expand Up @@ -185,7 +185,14 @@ impl PaymentIntentMetricAccumulator for PaymentsSuccessRateAccumulator {
}

impl PaymentIntentMetricAccumulator for ProcessedAmountAccumulator {
type MetricOutput = (Option<u64>, Option<u64>, Option<u64>, Option<u64>);
type MetricOutput = (
Option<u64>,
Option<u64>,
Option<u64>,
Option<u64>,
Option<u64>,
Option<u64>,
);
#[inline]
fn add_metrics_bucket(&mut self, metrics: &PaymentIntentMetricRow) {
self.total_with_retries = match (
Expand Down Expand Up @@ -235,6 +242,8 @@ impl PaymentIntentMetricAccumulator for ProcessedAmountAccumulator {
count_with_retries,
total_without_retries,
count_without_retries,
Some(0),
Some(0),
)
}
}
Expand Down Expand Up @@ -301,13 +310,19 @@ impl PaymentIntentMetricsAccumulator {
payments_success_rate,
payments_success_rate_without_smart_retries,
) = self.payments_success_rate.collect();
let (smart_retried_amount, smart_retried_amount_without_smart_retries) =
self.smart_retried_amount.collect();
let (
smart_retried_amount,
smart_retried_amount_without_smart_retries,
smart_retried_amount_in_usd,
smart_retried_amount_without_smart_retries_in_usd,
) = self.smart_retried_amount.collect();
let (
payment_processed_amount,
payment_processed_count,
payment_processed_amount_without_smart_retries,
payment_processed_count_without_smart_retries,
payment_processed_amount_in_usd,
payment_processed_amount_without_smart_retries_in_usd,
) = self.payment_processed_amount.collect();
let (
payments_success_rate_distribution_without_smart_retries,
Expand All @@ -317,7 +332,9 @@ impl PaymentIntentMetricsAccumulator {
successful_smart_retries: self.successful_smart_retries.collect(),
total_smart_retries: self.total_smart_retries.collect(),
smart_retried_amount,
smart_retried_amount_in_usd,
smart_retried_amount_without_smart_retries,
smart_retried_amount_without_smart_retries_in_usd,
payment_intent_count: self.payment_intent_count.collect(),
successful_payments,
successful_payments_without_smart_retries,
Expand All @@ -330,6 +347,8 @@ impl PaymentIntentMetricsAccumulator {
payment_processed_count_without_smart_retries,
payments_success_rate_distribution_without_smart_retries,
payments_failure_rate_distribution_without_smart_retries,
payment_processed_amount_in_usd,
payment_processed_amount_without_smart_retries_in_usd,
}
}
}
94 changes: 92 additions & 2 deletions crates/analytics/src/payment_intents/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use api_models::analytics::{
PaymentIntentFiltersResponse, PaymentIntentsAnalyticsMetadata, PaymentIntentsMetricsResponse,
SankeyResponse,
};
use common_enums::IntentStatus;
use bigdecimal::ToPrimitive;
use common_enums::{Currency, IntentStatus};
use common_utils::{errors::CustomResult, types::TimeRange};
use currency_conversion::{conversion::convert, types::ExchangeRates};
use error_stack::ResultExt;
use router_env::{
instrument, logger,
Expand Down Expand Up @@ -120,6 +122,7 @@ pub async fn get_sankey(
#[instrument(skip_all)]
pub async fn get_metrics(
pool: &AnalyticsProvider,
ex_rates: &ExchangeRates,
auth: &AuthInfo,
req: GetPaymentIntentMetricRequest,
) -> AnalyticsResult<PaymentIntentsMetricsResponse<MetricsBucketResponse>> {
Expand Down Expand Up @@ -227,16 +230,20 @@ pub async fn get_metrics(
let mut success = 0;
let mut success_without_smart_retries = 0;
let mut total_smart_retried_amount = 0;
let mut total_smart_retried_amount_in_usd = 0;
let mut total_smart_retried_amount_without_smart_retries = 0;
let mut total_smart_retried_amount_without_smart_retries_in_usd = 0;
let mut total = 0;
let mut total_payment_processed_amount = 0;
let mut total_payment_processed_amount_in_usd = 0;
let mut total_payment_processed_count = 0;
let mut total_payment_processed_amount_without_smart_retries = 0;
let mut total_payment_processed_amount_without_smart_retries_in_usd = 0;
let mut total_payment_processed_count_without_smart_retries = 0;
let query_data: Vec<MetricsBucketResponse> = metrics_accumulator
.into_iter()
.map(|(id, val)| {
let collected_values = val.collect();
let mut collected_values = val.collect();
if let Some(success_count) = collected_values.successful_payments {
success += success_count;
}
Expand All @@ -248,20 +255,95 @@ pub async fn get_metrics(
total += total_count;
}
if let Some(retried_amount) = collected_values.smart_retried_amount {
let amount_in_usd = id
.currency
.and_then(|currency| {
i64::try_from(retried_amount)
.inspect_err(|e| logger::error!("Amount conversion error: {:?}", e))
.ok()
.and_then(|amount_i64| {
convert(ex_rates, currency, Currency::USD, amount_i64)
.inspect_err(|e| {
logger::error!("Currency conversion error: {:?}", e)
})
.ok()
})
})
.map(|amount| (amount * rust_decimal::Decimal::new(100, 0)).to_u64())
.unwrap_or_default();
collected_values.smart_retried_amount_in_usd = amount_in_usd;
total_smart_retried_amount += retried_amount;
total_smart_retried_amount_in_usd += amount_in_usd.unwrap_or(0);
}
if let Some(retried_amount) =
collected_values.smart_retried_amount_without_smart_retries
{
let amount_in_usd = id
.currency
.and_then(|currency| {
i64::try_from(retried_amount)
.inspect_err(|e| logger::error!("Amount conversion error: {:?}", e))
.ok()
.and_then(|amount_i64| {
convert(ex_rates, currency, Currency::USD, amount_i64)
.inspect_err(|e| {
logger::error!("Currency conversion error: {:?}", e)
})
.ok()
})
})
.map(|amount| (amount * rust_decimal::Decimal::new(100, 0)).to_u64())
.unwrap_or_default();
collected_values.smart_retried_amount_without_smart_retries_in_usd = amount_in_usd;
total_smart_retried_amount_without_smart_retries += retried_amount;
total_smart_retried_amount_without_smart_retries_in_usd +=
amount_in_usd.unwrap_or(0);
}
if let Some(amount) = collected_values.payment_processed_amount {
let amount_in_usd = id
.currency
.and_then(|currency| {
i64::try_from(amount)
.inspect_err(|e| logger::error!("Amount conversion error: {:?}", e))
.ok()
.and_then(|amount_i64| {
convert(ex_rates, currency, Currency::USD, amount_i64)
.inspect_err(|e| {
logger::error!("Currency conversion error: {:?}", e)
})
.ok()
})
})
.map(|amount| (amount * rust_decimal::Decimal::new(100, 0)).to_u64())
.unwrap_or_default();
collected_values.payment_processed_amount_in_usd = amount_in_usd;
total_payment_processed_amount_in_usd += amount_in_usd.unwrap_or(0);
total_payment_processed_amount += amount;
}
if let Some(count) = collected_values.payment_processed_count {
total_payment_processed_count += count;
}
if let Some(amount) = collected_values.payment_processed_amount_without_smart_retries {
let amount_in_usd = id
.currency
.and_then(|currency| {
i64::try_from(amount)
.inspect_err(|e| logger::error!("Amount conversion error: {:?}", e))
.ok()
.and_then(|amount_i64| {
convert(ex_rates, currency, Currency::USD, amount_i64)
.inspect_err(|e| {
logger::error!("Currency conversion error: {:?}", e)
})
.ok()
})
})
.map(|amount| (amount * rust_decimal::Decimal::new(100, 0)).to_u64())
.unwrap_or_default();
collected_values.payment_processed_amount_without_smart_retries_in_usd =
amount_in_usd;
total_payment_processed_amount_without_smart_retries_in_usd +=
amount_in_usd.unwrap_or(0);
total_payment_processed_amount_without_smart_retries += amount;
}
if let Some(count) = collected_values.payment_processed_count_without_smart_retries {
Expand Down Expand Up @@ -294,6 +376,14 @@ pub async fn get_metrics(
total_payment_processed_amount_without_smart_retries: Some(
total_payment_processed_amount_without_smart_retries,
),
total_smart_retried_amount_in_usd: Some(total_smart_retried_amount_in_usd),
total_smart_retried_amount_without_smart_retries_in_usd: Some(
total_smart_retried_amount_without_smart_retries_in_usd,
),
total_payment_processed_amount_in_usd: Some(total_payment_processed_amount_in_usd),
total_payment_processed_amount_without_smart_retries_in_usd: Some(
total_payment_processed_amount_without_smart_retries_in_usd,
),
total_payment_processed_count: Some(total_payment_processed_count),
total_payment_processed_count_without_smart_retries: Some(
total_payment_processed_count_without_smart_retries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ where
query_builder
.add_select_column("attempt_count == 1 as first_attempt")
.switch()?;

query_builder.add_select_column("currency").switch()?;
query_builder
.add_select_column(Aggregate::Sum {
field: "amount",
Expand Down Expand Up @@ -101,7 +101,10 @@ where
.add_group_by_clause("attempt_count")
.attach_printable("Error grouping by attempt_count")
.switch()?;

query_builder
.add_group_by_clause("currency")
.attach_printable("Error grouping by currency")
.switch()?;
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ where
.add_select_column("attempt_count == 1 as first_attempt")
.switch()?;

query_builder.add_select_column("currency").switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
Expand Down Expand Up @@ -102,7 +103,10 @@ where
.add_group_by_clause("first_attempt")
.attach_printable("Error grouping by first_attempt")
.switch()?;

query_builder
.add_group_by_clause("currency")
.attach_printable("Error grouping by first_attempt")
.switch()?;
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ where
query_builder
.add_select_column("attempt_count == 1 as first_attempt")
.switch()?;

query_builder.add_select_column("currency").switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
Expand Down Expand Up @@ -102,7 +102,10 @@ where
.add_group_by_clause("first_attempt")
.attach_printable("Error grouping by first_attempt")
.switch()?;

query_builder
.add_group_by_clause("currency")
.attach_printable("Error grouping by currency")
.switch()?;
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
Expand Down
15 changes: 14 additions & 1 deletion crates/analytics/src/payments/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,14 @@ impl PaymentMetricAccumulator for CountAccumulator {
}

impl PaymentMetricAccumulator for ProcessedAmountAccumulator {
type MetricOutput = (Option<u64>, Option<u64>, Option<u64>, Option<u64>);
type MetricOutput = (
Option<u64>,
Option<u64>,
Option<u64>,
Option<u64>,
Option<u64>,
Option<u64>,
);
#[inline]
fn add_metrics_bucket(&mut self, metrics: &PaymentMetricRow) {
self.total_with_retries = match (
Expand Down Expand Up @@ -322,6 +329,8 @@ impl PaymentMetricAccumulator for ProcessedAmountAccumulator {
count_with_retries,
total_without_retries,
count_without_retries,
Some(0),
Some(0),
)
}
}
Expand Down Expand Up @@ -378,6 +387,8 @@ impl PaymentMetricsAccumulator {
payment_processed_count,
payment_processed_amount_without_smart_retries,
payment_processed_count_without_smart_retries,
payment_processed_amount_usd,
payment_processed_amount_without_smart_retries_usd,
) = self.processed_amount.collect();
let (
payments_success_rate_distribution,
Expand Down Expand Up @@ -406,6 +417,8 @@ impl PaymentMetricsAccumulator {
payments_failure_rate_distribution_without_smart_retries,
failure_reason_count,
failure_reason_count_without_smart_retries,
payment_processed_amount_usd,
payment_processed_amount_without_smart_retries_usd,
}
}
}
Loading

0 comments on commit df9169b

Please sign in to comment.