diff --git a/crates/diesel_models/src/address.rs b/crates/diesel_models/src/address.rs index 828a34f9a289..028d878f7da9 100644 --- a/crates/diesel_models/src/address.rs +++ b/crates/diesel_models/src/address.rs @@ -1,9 +1,10 @@ use diesel::{AsChangeset, Identifiable, Insertable, Queryable}; +use serde::{Deserialize, Serialize}; use time::PrimitiveDateTime; use crate::{encryption::Encryption, enums, schema::address}; -#[derive(Clone, Debug, Insertable, router_derive::DebugAsDisplay)] +#[derive(Clone, Debug, Insertable, Serialize, Deserialize, router_derive::DebugAsDisplay)] #[diesel(table_name = address)] pub struct AddressNew { pub address_id: String, @@ -25,7 +26,7 @@ pub struct AddressNew { pub modified_at: PrimitiveDateTime, } -#[derive(Clone, Debug, Queryable, Identifiable)] +#[derive(Clone, Debug, Queryable, Identifiable, Serialize, Deserialize)] #[diesel(table_name = address, primary_key(address_id))] pub struct Address { pub id: Option, diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index 9a416d6eb04e..eba4ce243ae5 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -2,6 +2,7 @@ use error_stack::{IntoReport, ResultExt}; use serde::{Deserialize, Serialize}; use crate::{ + address::AddressNew, errors, payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate}, payment_intent::{PaymentIntent, PaymentIntentNew, PaymentIntentUpdate}, @@ -39,6 +40,7 @@ pub enum Insertable { PaymentIntent(PaymentIntentNew), PaymentAttempt(PaymentAttemptNew), Refund(RefundNew), + Address(Box), } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index 4f147286c079..ad77ebec4e2f 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -148,6 +148,7 @@ async fn drainer( let payment_intent = "payment_intent"; let payment_attempt = "payment_attempt"; let refund = "refund"; + let address = "address"; match db_op { // TODO: Handle errors kv::DBOperation::Insert { insertable } => { @@ -170,6 +171,9 @@ async fn drainer( kv::Insertable::Refund(a) => { macro_util::handle_resp!(a.insert(&conn).await, insert_op, refund) } + kv::Insertable::Address(addr) => { + macro_util::handle_resp!(addr.insert(&conn).await, insert_op, address) + } } }) .await; diff --git a/crates/router/src/core/payment_methods/cards.rs b/crates/router/src/core/payment_methods/cards.rs index 24cbeec31ee8..ee790c951159 100644 --- a/crates/router/src/core/payment_methods/cards.rs +++ b/crates/router/src/core/payment_methods/cards.rs @@ -802,6 +802,7 @@ pub async fn list_payment_methods( &key_store, pi.payment_id.clone(), merchant_account.merchant_id.clone(), + merchant_account.storage_scheme, ) .await }) @@ -818,6 +819,7 @@ pub async fn list_payment_methods( &key_store, pi.payment_id.clone(), merchant_account.merchant_id.clone(), + merchant_account.storage_scheme, ) .await }) diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 2442a2028652..f4b7d31de195 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -104,6 +104,7 @@ pub fn filter_mca_based_on_business_profile( } #[instrument(skip_all)] +#[allow(clippy::too_many_arguments)] pub async fn create_or_find_address_for_payment_by_request( db: &dyn StorageInterface, req_address: Option<&api::Address>, @@ -112,6 +113,7 @@ pub async fn create_or_find_address_for_payment_by_request( customer_id: Option<&String>, merchant_key_store: &domain::MerchantKeyStore, payment_id: &str, + storage_scheme: storage_enums::MerchantStorageScheme, ) -> CustomResult, errors::ApiErrorResponse> { let key = merchant_key_store.key.get_inner().peek(); @@ -122,6 +124,7 @@ pub async fn create_or_find_address_for_payment_by_request( payment_id, id, merchant_key_store, + storage_scheme, ) .await, ) @@ -148,6 +151,7 @@ pub async fn create_or_find_address_for_payment_by_request( .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed while encrypting address while insert")?, merchant_key_store, + storage_scheme, ) .await .change_context(errors::ApiErrorResponse::InternalServerError) @@ -224,6 +228,7 @@ pub async fn get_address_by_id( merchant_key_store: &domain::MerchantKeyStore, payment_id: String, merchant_id: String, + storage_scheme: storage_enums::MerchantStorageScheme, ) -> CustomResult, errors::ApiErrorResponse> { match address_id { None => Ok(None), @@ -233,6 +238,7 @@ pub async fn get_address_by_id( &payment_id, &address_id, merchant_key_store, + storage_scheme, ) .await .ok()), diff --git a/crates/router/src/core/payments/operations/payment_approve.rs b/crates/router/src/core/payments/operations/payment_approve.rs index 708a6ebb90f2..871e906ee9df 100644 --- a/crates/router/src/core/payments/operations/payment_approve.rs +++ b/crates/router/src/core/payments/operations/payment_approve.rs @@ -144,6 +144,7 @@ impl GetTracker, api::PaymentsRequest> for Pa payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; let billing_address = helpers::create_or_find_address_for_payment_by_request( @@ -154,6 +155,7 @@ impl GetTracker, api::PaymentsRequest> for Pa payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; diff --git a/crates/router/src/core/payments/operations/payment_cancel.rs b/crates/router/src/core/payments/operations/payment_cancel.rs index 7efeb9a17679..f2964a4e48c0 100644 --- a/crates/router/src/core/payments/operations/payment_cancel.rs +++ b/crates/router/src/core/payments/operations/payment_cancel.rs @@ -87,6 +87,7 @@ impl GetTracker, api::PaymentsCancelRequest> payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; @@ -98,6 +99,7 @@ impl GetTracker, api::PaymentsCancelRequest> payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; diff --git a/crates/router/src/core/payments/operations/payment_capture.rs b/crates/router/src/core/payments/operations/payment_capture.rs index 1c03f42757a2..e270a8df4954 100644 --- a/crates/router/src/core/payments/operations/payment_capture.rs +++ b/crates/router/src/core/payments/operations/payment_capture.rs @@ -161,6 +161,7 @@ impl GetTracker, api::PaymentsCaptu payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; @@ -172,6 +173,7 @@ impl GetTracker, api::PaymentsCaptu payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; diff --git a/crates/router/src/core/payments/operations/payment_complete_authorize.rs b/crates/router/src/core/payments/operations/payment_complete_authorize.rs index e72342e3c41f..1ee7bec135ff 100644 --- a/crates/router/src/core/payments/operations/payment_complete_authorize.rs +++ b/crates/router/src/core/payments/operations/payment_complete_authorize.rs @@ -153,6 +153,7 @@ impl GetTracker, api::PaymentsRequest> for Co payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; let billing_address = helpers::create_or_find_address_for_payment_by_request( @@ -163,6 +164,7 @@ impl GetTracker, api::PaymentsRequest> for Co payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; diff --git a/crates/router/src/core/payments/operations/payment_confirm.rs b/crates/router/src/core/payments/operations/payment_confirm.rs index 19c63cc78603..5ffc07ba5c81 100644 --- a/crates/router/src/core/payments/operations/payment_confirm.rs +++ b/crates/router/src/core/payments/operations/payment_confirm.rs @@ -115,6 +115,7 @@ impl GetTracker, api::PaymentsRequest> for Pa .or(customer_details.customer_id.as_ref()), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ); let billing_address_fut = helpers::create_or_find_address_for_payment_by_request( @@ -128,6 +129,7 @@ impl GetTracker, api::PaymentsRequest> for Pa .or(customer_details.customer_id.as_ref()), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ); let config_update_fut = request diff --git a/crates/router/src/core/payments/operations/payment_create.rs b/crates/router/src/core/payments/operations/payment_create.rs index f877fc376220..831090c14cdd 100644 --- a/crates/router/src/core/payments/operations/payment_create.rs +++ b/crates/router/src/core/payments/operations/payment_create.rs @@ -100,6 +100,7 @@ impl GetTracker, api::PaymentsRequest> for Pa customer_details.customer_id.as_ref(), merchant_key_store, &payment_id, + merchant_account.storage_scheme, ) .await?; @@ -111,6 +112,7 @@ impl GetTracker, api::PaymentsRequest> for Pa customer_details.customer_id.as_ref(), merchant_key_store, &payment_id, + merchant_account.storage_scheme, ) .await?; diff --git a/crates/router/src/core/payments/operations/payment_reject.rs b/crates/router/src/core/payments/operations/payment_reject.rs index 61a9aede297b..fe52d1b89d54 100644 --- a/crates/router/src/core/payments/operations/payment_reject.rs +++ b/crates/router/src/core/payments/operations/payment_reject.rs @@ -85,6 +85,7 @@ impl GetTracker, PaymentsRejectRequest> for P payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; @@ -96,6 +97,7 @@ impl GetTracker, PaymentsRejectRequest> for P payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; diff --git a/crates/router/src/core/payments/operations/payment_session.rs b/crates/router/src/core/payments/operations/payment_session.rs index 47c95d8b79dd..fb65a5507d90 100644 --- a/crates/router/src/core/payments/operations/payment_session.rs +++ b/crates/router/src/core/payments/operations/payment_session.rs @@ -98,6 +98,7 @@ impl GetTracker, api::PaymentsSessionRequest> payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; @@ -109,6 +110,7 @@ impl GetTracker, api::PaymentsSessionRequest> payment_intent.customer_id.as_ref(), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; diff --git a/crates/router/src/core/payments/operations/payment_start.rs b/crates/router/src/core/payments/operations/payment_start.rs index 400678506f9a..2d34d4099791 100644 --- a/crates/router/src/core/payments/operations/payment_start.rs +++ b/crates/router/src/core/payments/operations/payment_start.rs @@ -93,6 +93,7 @@ impl GetTracker, api::PaymentsStartRequest> f payment_intent.customer_id.as_ref(), mechant_key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; let billing_address = helpers::create_or_find_address_for_payment_by_request( @@ -103,6 +104,7 @@ impl GetTracker, api::PaymentsStartRequest> f payment_intent.customer_id.as_ref(), mechant_key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; diff --git a/crates/router/src/core/payments/operations/payment_status.rs b/crates/router/src/core/payments/operations/payment_status.rs index a29d1c14d56e..80830efd13d7 100644 --- a/crates/router/src/core/payments/operations/payment_status.rs +++ b/crates/router/src/core/payments/operations/payment_status.rs @@ -243,6 +243,7 @@ async fn get_tracker_for_sync< mechant_key_store, payment_intent.payment_id.clone(), merchant_account.merchant_id.clone(), + merchant_account.storage_scheme, ) .await?; let billing_address = helpers::get_address_by_id( @@ -251,6 +252,7 @@ async fn get_tracker_for_sync< mechant_key_store, payment_intent.payment_id.clone(), merchant_account.merchant_id.clone(), + merchant_account.storage_scheme, ) .await?; diff --git a/crates/router/src/core/payments/operations/payment_update.rs b/crates/router/src/core/payments/operations/payment_update.rs index 318c4e8ad7bf..800abd2cb039 100644 --- a/crates/router/src/core/payments/operations/payment_update.rs +++ b/crates/router/src/core/payments/operations/payment_update.rs @@ -157,6 +157,7 @@ impl GetTracker, api::PaymentsRequest> for Pa .or(customer_details.customer_id.as_ref()), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; let billing_address = helpers::create_or_find_address_for_payment_by_request( @@ -170,6 +171,7 @@ impl GetTracker, api::PaymentsRequest> for Pa .or(customer_details.customer_id.as_ref()), key_store, &payment_intent.payment_id, + merchant_account.storage_scheme, ) .await?; diff --git a/crates/router/src/core/payouts.rs b/crates/router/src/core/payouts.rs index a640478d8d5e..a487700a3646 100644 --- a/crates/router/src/core/payouts.rs +++ b/crates/router/src/core/payouts.rs @@ -1166,6 +1166,7 @@ pub async fn payout_create_db_entries( Some(&customer_id.to_owned()), key_store, payout_id, + merchant_account.storage_scheme, ) .await?; let address_id = billing_address @@ -1302,6 +1303,7 @@ pub async fn make_payout_data( Some(&payouts.customer_id.to_owned()), key_store, &payouts.payout_id, + merchant_account.storage_scheme, ) .await?; diff --git a/crates/router/src/db/address.rs b/crates/router/src/db/address.rs index faea880f9e38..2205f4555d74 100644 --- a/crates/router/src/db/address.rs +++ b/crates/router/src/db/address.rs @@ -1,30 +1,30 @@ -use common_utils::ext_traits::AsyncExt; +use data_models::MerchantStorageScheme; use diesel_models::address::AddressUpdateInternal; -use error_stack::{IntoReport, ResultExt}; +use error_stack::ResultExt; use router_env::{instrument, tracing}; -use super::{MockDb, Store}; +use super::MockDb; use crate::{ - connection, core::errors::{self, CustomResult}, types::{ domain::{ self, behaviour::{Conversion, ReverseConversion}, }, - storage, + storage as storage_types, }, }; #[async_trait::async_trait] pub trait AddressInterface where - domain::Address: Conversion, + domain::Address: + Conversion, { async fn update_address( &self, address_id: String, - address: storage::AddressUpdate, + address: storage_types::AddressUpdate, key_store: &domain::MerchantKeyStore, ) -> CustomResult; @@ -33,6 +33,7 @@ where payment_id: &str, address: domain::Address, key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, ) -> CustomResult; async fn insert_address_for_customers( @@ -47,54 +48,55 @@ where payment_id: &str, address_id: &str, key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, ) -> CustomResult; async fn update_address_by_merchant_id_customer_id( &self, customer_id: &str, merchant_id: &str, - address: storage::AddressUpdate, + address: storage_types::AddressUpdate, key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError>; } -#[async_trait::async_trait] -impl AddressInterface for Store { - async fn find_address_by_merchant_id_payment_id_address_id( - &self, - merchant_id: &str, - payment_id: &str, - address_id: &str, - key_store: &domain::MerchantKeyStore, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage::Address::find_by_merchant_id_payment_id_address_id( - &conn, - merchant_id, - payment_id, - address_id, - ) - .await - .map_err(Into::into) - .into_report() - .async_and_then(|address| async { - address - .convert(key_store.key.get_inner()) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - } +#[cfg(not(feature = "kv_store"))] +mod storage { + use common_utils::ext_traits::AsyncExt; + use data_models::MerchantStorageScheme; + use error_stack::{IntoReport, ResultExt}; + use router_env::{instrument, tracing}; - #[instrument(skip_all)] - async fn update_address( - &self, - address_id: String, - address: storage::AddressUpdate, - key_store: &domain::MerchantKeyStore, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - storage::Address::update_by_address_id(&conn, address_id, address.into()) + use super::AddressInterface; + use crate::{ + connection, + core::errors::{self, CustomResult}, + services::Store, + types::{ + domain::{ + self, + behaviour::{Conversion, ReverseConversion}, + }, + storage::{self as storage_types}, + }, + }; + #[async_trait::async_trait] + impl AddressInterface for Store { + async fn find_address_by_merchant_id_payment_id_address_id( + &self, + merchant_id: &str, + payment_id: &str, + address_id: &str, + key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + storage_types::Address::find_by_merchant_id_payment_id_address_id( + &conn, + merchant_id, + payment_id, + address_id, + ) .await .map_err(Into::into) .into_report() @@ -105,85 +107,341 @@ impl AddressInterface for Store { .change_context(errors::StorageError::DecryptionError) }) .await - } + } - async fn insert_address_for_payments( - &self, - _payment_id: &str, - address: domain::Address, - key_store: &domain::MerchantKeyStore, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - address - .construct_new() - .await - .change_context(errors::StorageError::EncryptionError)? - .insert(&conn) + #[instrument(skip_all)] + async fn update_address( + &self, + address_id: String, + address: storage_types::AddressUpdate, + key_store: &domain::MerchantKeyStore, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + storage_types::Address::update_by_address_id(&conn, address_id, address.into()) + .await + .map_err(Into::into) + .into_report() + .async_and_then(|address| async { + address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await + } + + async fn insert_address_for_payments( + &self, + _payment_id: &str, + address: domain::Address, + key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + address + .construct_new() + .await + .change_context(errors::StorageError::EncryptionError)? + .insert(&conn) + .await + .map_err(Into::into) + .into_report() + .async_and_then(|address| async { + address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await + } + + async fn insert_address_for_customers( + &self, + address: domain::Address, + key_store: &domain::MerchantKeyStore, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + address + .construct_new() + .await + .change_context(errors::StorageError::EncryptionError)? + .insert(&conn) + .await + .map_err(Into::into) + .into_report() + .async_and_then(|address| async { + address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await + } + + async fn update_address_by_merchant_id_customer_id( + &self, + customer_id: &str, + merchant_id: &str, + address: storage_types::AddressUpdate, + key_store: &domain::MerchantKeyStore, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_write(self).await?; + storage_types::Address::update_by_merchant_id_customer_id( + &conn, + customer_id, + merchant_id, + address.into(), + ) .await .map_err(Into::into) .into_report() - .async_and_then(|address| async { - address - .convert(key_store.key.get_inner()) - .await - .change_context(errors::StorageError::DecryptionError) + .async_and_then(|addresses| async { + let mut output = Vec::with_capacity(addresses.len()); + for address in addresses.into_iter() { + output.push( + address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError)?, + ) + } + Ok(output) }) .await + } } +} - async fn insert_address_for_customers( - &self, - address: domain::Address, - key_store: &domain::MerchantKeyStore, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - address - .construct_new() - .await - .change_context(errors::StorageError::EncryptionError)? - .insert(&conn) - .await - .map_err(Into::into) - .into_report() - .async_and_then(|address| async { - address - .convert(key_store.key.get_inner()) +#[cfg(feature = "kv_store")] +mod storage { + use common_utils::ext_traits::AsyncExt; + use data_models::MerchantStorageScheme; + use error_stack::{IntoReport, ResultExt}; + use redis_interface::HsetnxReply; + use router_env::{instrument, tracing}; + use storage_impl::redis::kv_store::{PartitionKey, RedisConnInterface}; + + use super::AddressInterface; + use crate::{ + connection, + core::errors::{self, CustomResult}, + services::Store, + types::{ + domain::{ + self, + behaviour::{Conversion, ReverseConversion}, + }, + storage::{self as storage_types, kv}, + }, + utils::db_utils, + }; + #[async_trait::async_trait] + impl AddressInterface for Store { + async fn find_address_by_merchant_id_payment_id_address_id( + &self, + merchant_id: &str, + payment_id: &str, + address_id: &str, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + let database_call = || async { + storage_types::Address::find_by_merchant_id_payment_id_address_id( + &conn, + merchant_id, + payment_id, + address_id, + ) + .await + .map_err(Into::into) + .into_report() + }; + let address = match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", merchant_id, payment_id); + let field = format!("add_{}", address_id); + db_utils::try_redis_get_else_try_database_get( + self.get_redis_conn() + .change_context(errors::StorageError::DatabaseConnectionError)? + .get_hash_field_and_deserialize(&key, &field, "Address"), + database_call, + ) .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - } + } + }?; + address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError) + } - async fn update_address_by_merchant_id_customer_id( - &self, - customer_id: &str, - merchant_id: &str, - address: storage::AddressUpdate, - key_store: &domain::MerchantKeyStore, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_write(self).await?; - storage::Address::update_by_merchant_id_customer_id( - &conn, - customer_id, - merchant_id, - address.into(), - ) - .await - .map_err(Into::into) - .into_report() - .async_and_then(|addresses| async { - let mut output = Vec::with_capacity(addresses.len()); - for address in addresses.into_iter() { - output.push( + #[instrument(skip_all)] + async fn update_address( + &self, + address_id: String, + address: storage_types::AddressUpdate, + key_store: &domain::MerchantKeyStore, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + storage_types::Address::update_by_address_id(&conn, address_id, address.into()) + .await + .map_err(Into::into) + .into_report() + .async_and_then(|address| async { address .convert(key_store.key.get_inner()) .await - .change_context(errors::StorageError::DecryptionError)?, - ) + .change_context(errors::StorageError::DecryptionError) + }) + .await + } + + async fn insert_address_for_payments( + &self, + payment_id: &str, + address: domain::Address, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let address_new = address + .clone() + .construct_new() + .await + .change_context(errors::StorageError::EncryptionError)?; + let merchant_id = address_new.merchant_id.clone(); + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + let conn = connection::pg_connection_write(self).await?; + address_new + .insert(&conn) + .await + .map_err(Into::into) + .into_report() + .async_and_then(|address| async { + address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await + } + MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", merchant_id, payment_id); + let field = format!("add_{}", &address_new.address_id); + let created_address = diesel_models::Address { + id: Some(0i32), + address_id: address_new.address_id.clone(), + city: address_new.city.clone(), + country: address_new.country, + line1: address_new.line1.clone(), + line2: address_new.line2.clone(), + line3: address_new.line3.clone(), + state: address_new.state.clone(), + zip: address_new.zip.clone(), + first_name: address_new.first_name.clone(), + last_name: address_new.last_name.clone(), + phone_number: address_new.phone_number.clone(), + country_code: address_new.country_code.clone(), + created_at: address_new.created_at, + modified_at: address_new.modified_at, + customer_id: address_new.customer_id.clone(), + merchant_id: address_new.merchant_id.clone(), + payment_id: address_new.payment_id.clone(), + }; + match self + .get_redis_conn() + .map_err(Into::::into)? + .serialize_and_set_hash_field_if_not_exist(&key, &field, &created_address) + .await + { + Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { + entity: "address", + key: Some(address_new.address_id), + }) + .into_report(), + Ok(HsetnxReply::KeySet) => { + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: kv::Insertable::Address(Box::new(address_new)), + }, + }; + self.push_to_drainer_stream::( + redis_entry, + PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id, + }, + ) + .await + .change_context(errors::StorageError::KVError)?; + + Ok(created_address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError)?) + } + Err(er) => Err(er).change_context(errors::StorageError::KVError), + } + } } - Ok(output) - }) - .await + } + + async fn insert_address_for_customers( + &self, + address: domain::Address, + key_store: &domain::MerchantKeyStore, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + address + .construct_new() + .await + .change_context(errors::StorageError::EncryptionError)? + .insert(&conn) + .await + .map_err(Into::into) + .into_report() + .async_and_then(|address| async { + address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await + } + + async fn update_address_by_merchant_id_customer_id( + &self, + customer_id: &str, + merchant_id: &str, + address: storage_types::AddressUpdate, + key_store: &domain::MerchantKeyStore, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_write(self).await?; + storage_types::Address::update_by_merchant_id_customer_id( + &conn, + customer_id, + merchant_id, + address.into(), + ) + .await + .map_err(Into::into) + .into_report() + .async_and_then(|addresses| async { + let mut output = Vec::with_capacity(addresses.len()); + for address in addresses.into_iter() { + output.push( + address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError)?, + ) + } + Ok(output) + }) + .await + } } } @@ -195,6 +453,7 @@ impl AddressInterface for MockDb { _payment_id: &str, address_id: &str, key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, ) -> CustomResult { match self .addresses @@ -220,7 +479,7 @@ impl AddressInterface for MockDb { async fn update_address( &self, address_id: String, - address_update: storage::AddressUpdate, + address_update: storage_types::AddressUpdate, key_store: &domain::MerchantKeyStore, ) -> CustomResult { match self @@ -251,6 +510,7 @@ impl AddressInterface for MockDb { _payment_id: &str, address_new: domain::Address, key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, ) -> CustomResult { let mut addresses = self.addresses.lock().await; @@ -289,7 +549,7 @@ impl AddressInterface for MockDb { &self, customer_id: &str, merchant_id: &str, - address_update: storage::AddressUpdate, + address_update: storage_types::AddressUpdate, key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { match self diff --git a/crates/storage_impl/src/address.rs b/crates/storage_impl/src/address.rs new file mode 100644 index 000000000000..519d4b10d322 --- /dev/null +++ b/crates/storage_impl/src/address.rs @@ -0,0 +1,5 @@ +use diesel_models::address::Address; + +use crate::redis::kv_store::KvStorePartition; + +impl KvStorePartition for Address {} diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index a7bc9e13cf0a..f3708537359f 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -5,6 +5,7 @@ use diesel_models::{self as store}; use error_stack::ResultExt; use masking::StrongSecret; use redis::{kv_store::RedisConnInterface, RedisStore}; +mod address; pub mod config; pub mod connection; pub mod database;