From 1ba6282699b7dff5e6e95c9a14e51c0f8bf749cd Mon Sep 17 00:00:00 2001 From: Narayan Bhat <48803246+Narayanbhat166@users.noreply.github.com> Date: Fri, 3 Nov 2023 17:21:36 +0530 Subject: [PATCH] feat(merchant_connector_account): add cache for querying by `merchant_connector_id` (#2738) --- crates/router/src/core/admin.rs | 4 +- crates/router/src/core/cache.rs | 2 +- crates/router/src/core/payments.rs | 5 ++ crates/router/src/db/cache.rs | 37 +++++++++--- crates/router/src/db/merchant_account.rs | 22 ++++--- .../src/db/merchant_connector_account.rs | 60 ++++++++++++++----- 6 files changed, 93 insertions(+), 37 deletions(-) diff --git a/crates/router/src/core/admin.rs b/crates/router/src/core/admin.rs index cd1243dd3688..5c9f44ffe575 100644 --- a/crates/router/src/core/admin.rs +++ b/crates/router/src/core/admin.rs @@ -1020,8 +1020,8 @@ pub async fn update_payment_connector( .await .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed while encrypting data")?, - test_mode: mca.test_mode, - disabled: mca.disabled, + test_mode: req.test_mode, + disabled: req.disabled, payment_methods_enabled, metadata: req.metadata, frm_configs, diff --git a/crates/router/src/core/cache.rs b/crates/router/src/core/cache.rs index cba9a5ec303f..a8ca8395a670 100644 --- a/crates/router/src/core/cache.rs +++ b/crates/router/src/core/cache.rs @@ -10,7 +10,7 @@ pub async fn invalidate( key: &str, ) -> CustomResult, errors::ApiErrorResponse> { let store = state.store.as_ref(); - let result = publish_into_redact_channel(store, CacheKind::All(key.into())) + let result = publish_into_redact_channel(store, [CacheKind::All(key.into())]) .await .change_context(errors::ApiErrorResponse::InternalServerError)?; diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index f26b91479ece..9aa0e3c70d25 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -632,6 +632,11 @@ where ) .await?; + if payment_data.payment_attempt.merchant_connector_id.is_none() { + payment_data.payment_attempt.merchant_connector_id = + merchant_connector_account.get_mca_id(); + } + let (pd, tokenization_action) = get_connector_tokenization_action_when_confirm_true( state, operation, diff --git a/crates/router/src/db/cache.rs b/crates/router/src/db/cache.rs index 06ab85591a93..0688665f0c4c 100644 --- a/crates/router/src/db/cache.rs +++ b/crates/router/src/db/cache.rs @@ -100,9 +100,9 @@ where Ok(data) } -pub async fn publish_into_redact_channel<'a>( +pub async fn publish_into_redact_channel<'a, K: IntoIterator> + Send>( store: &dyn StorageInterface, - key: CacheKind<'a>, + keys: K, ) -> CustomResult { let redis_conn = store .get_redis_conn() @@ -111,10 +111,18 @@ pub async fn publish_into_redact_channel<'a>( )) .attach_printable("Failed to get redis connection")?; - redis_conn - .publish(consts::PUB_SUB_CHANNEL, key) - .await - .change_context(errors::StorageError::KVError) + let futures = keys.into_iter().map(|key| async { + redis_conn + .clone() + .publish(consts::PUB_SUB_CHANNEL, key) + .await + .change_context(errors::StorageError::KVError) + }); + + Ok(futures::future::try_join_all(futures) + .await? + .iter() + .sum::()) } pub async fn publish_and_redact<'a, T, F, Fut>( @@ -127,6 +135,21 @@ where Fut: futures::Future> + Send, { let data = fun().await?; - publish_into_redact_channel(store, key).await?; + publish_into_redact_channel(store, [key]).await?; + Ok(data) +} + +pub async fn publish_and_redact_multiple<'a, T, F, Fut, K>( + store: &dyn StorageInterface, + keys: K, + fun: F, +) -> CustomResult +where + F: FnOnce() -> Fut + Send, + Fut: futures::Future> + Send, + K: IntoIterator> + Send, +{ + let data = fun().await?; + publish_into_redact_channel(store, keys).await?; Ok(data) } diff --git a/crates/router/src/db/merchant_account.rs b/crates/router/src/db/merchant_account.rs index e0bff7d9069c..0d3ce99b948d 100644 --- a/crates/router/src/db/merchant_account.rs +++ b/crates/router/src/db/merchant_account.rs @@ -399,19 +399,17 @@ async fn publish_and_redact_merchant_account_cache( store: &dyn super::StorageInterface, merchant_account: &storage::MerchantAccount, ) -> CustomResult<(), errors::StorageError> { - super::cache::publish_into_redact_channel( - store, - CacheKind::Accounts(merchant_account.merchant_id.as_str().into()), - ) - .await?; - merchant_account + let publishable_key = merchant_account .publishable_key .as_ref() - .async_map(|pub_key| async { - super::cache::publish_into_redact_channel(store, CacheKind::Accounts(pub_key.into())) - .await - }) - .await - .transpose()?; + .map(|publishable_key| CacheKind::Accounts(publishable_key.into())); + + let mut cache_keys = vec![CacheKind::Accounts( + merchant_account.merchant_id.as_str().into(), + )]; + + cache_keys.extend(publishable_key.into_iter()); + + super::cache::publish_into_redact_channel(store, cache_keys).await?; Ok(()) } diff --git a/crates/router/src/db/merchant_connector_account.rs b/crates/router/src/db/merchant_connector_account.rs index 9ff3f5121082..ecf52531f28a 100644 --- a/crates/router/src/db/merchant_connector_account.rs +++ b/crates/router/src/db/merchant_connector_account.rs @@ -290,21 +290,40 @@ impl MerchantConnectorAccountInterface for Store { merchant_connector_id: &str, key_store: &domain::MerchantKeyStore, ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id( - &conn, - merchant_id, - merchant_connector_id, - ) - .await - .map_err(Into::into) - .into_report() - .async_and_then(|item| async { - item.convert(key_store.key.get_inner()) + let find_call = || async { + let conn = connection::pg_connection_read(self).await?; + storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id( + &conn, + merchant_id, + merchant_connector_id, + ) + .await + .map_err(Into::into) + .into_report() + }; + + #[cfg(not(feature = "accounts_cache"))] + { + find_call() + .await? + .convert(key_store.key.get_inner()) .await .change_context(errors::StorageError::DecryptionError) - }) - .await + } + + #[cfg(feature = "accounts_cache")] + { + super::cache::get_or_populate_in_memory( + self, + &format!("{}_{}", merchant_id, merchant_connector_id), + find_call, + &cache::ACCOUNTS_CACHE, + ) + .await? + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError) + } } async fn insert_merchant_connector_account( @@ -367,6 +386,9 @@ impl MerchantConnectorAccountInterface for Store { "profile_id".to_string(), ))?; + let _merchant_id = this.merchant_id.clone(); + let _merchant_connector_id = this.merchant_connector_id.clone(); + let update_call = || async { let conn = connection::pg_connection_write(self).await?; Conversion::convert(this) @@ -386,9 +408,17 @@ impl MerchantConnectorAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::publish_and_redact( + // Redact both the caches as any one or both might be used because of backwards compatibility + super::cache::publish_and_redact_multiple( self, - cache::CacheKind::Accounts(format!("{}_{}", _profile_id, _connector_name).into()), + [ + cache::CacheKind::Accounts( + format!("{}_{}", _profile_id, _connector_name).into(), + ), + cache::CacheKind::Accounts( + format!("{}_{}", _merchant_id, _merchant_connector_id).into(), + ), + ], update_call, ) .await