Skip to content

Commit

Permalink
feat(merchant_connector_account): add cache for querying by `merchant…
Browse files Browse the repository at this point in the history
…_connector_id` (#2738)
  • Loading branch information
Narayanbhat166 authored Nov 3, 2023
1 parent 255a4f8 commit 1ba6282
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 37 deletions.
4 changes: 2 additions & 2 deletions crates/router/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,8 +1020,8 @@ pub async fn update_payment_connector(
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed while encrypting data")?,
test_mode: mca.test_mode,
disabled: mca.disabled,
test_mode: req.test_mode,
disabled: req.disabled,
payment_methods_enabled,
metadata: req.metadata,
frm_configs,
Expand Down
2 changes: 1 addition & 1 deletion crates/router/src/core/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub async fn invalidate(
key: &str,
) -> CustomResult<services::api::ApplicationResponse<serde_json::Value>, errors::ApiErrorResponse> {
let store = state.store.as_ref();
let result = publish_into_redact_channel(store, CacheKind::All(key.into()))
let result = publish_into_redact_channel(store, [CacheKind::All(key.into())])
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;

Expand Down
5 changes: 5 additions & 0 deletions crates/router/src/core/payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,11 @@ where
)
.await?;

if payment_data.payment_attempt.merchant_connector_id.is_none() {
payment_data.payment_attempt.merchant_connector_id =
merchant_connector_account.get_mca_id();
}

let (pd, tokenization_action) = get_connector_tokenization_action_when_confirm_true(
state,
operation,
Expand Down
37 changes: 30 additions & 7 deletions crates/router/src/db/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ where
Ok(data)
}

pub async fn publish_into_redact_channel<'a>(
pub async fn publish_into_redact_channel<'a, K: IntoIterator<Item = CacheKind<'a>> + Send>(
store: &dyn StorageInterface,
key: CacheKind<'a>,
keys: K,
) -> CustomResult<usize, errors::StorageError> {
let redis_conn = store
.get_redis_conn()
Expand All @@ -111,10 +111,18 @@ pub async fn publish_into_redact_channel<'a>(
))
.attach_printable("Failed to get redis connection")?;

redis_conn
.publish(consts::PUB_SUB_CHANNEL, key)
.await
.change_context(errors::StorageError::KVError)
let futures = keys.into_iter().map(|key| async {
redis_conn
.clone()
.publish(consts::PUB_SUB_CHANNEL, key)
.await
.change_context(errors::StorageError::KVError)
});

Ok(futures::future::try_join_all(futures)
.await?
.iter()
.sum::<usize>())
}

pub async fn publish_and_redact<'a, T, F, Fut>(
Expand All @@ -127,6 +135,21 @@ where
Fut: futures::Future<Output = CustomResult<T, errors::StorageError>> + Send,
{
let data = fun().await?;
publish_into_redact_channel(store, key).await?;
publish_into_redact_channel(store, [key]).await?;
Ok(data)
}

pub async fn publish_and_redact_multiple<'a, T, F, Fut, K>(
store: &dyn StorageInterface,
keys: K,
fun: F,
) -> CustomResult<T, errors::StorageError>
where
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, errors::StorageError>> + Send,
K: IntoIterator<Item = CacheKind<'a>> + Send,
{
let data = fun().await?;
publish_into_redact_channel(store, keys).await?;
Ok(data)
}
22 changes: 10 additions & 12 deletions crates/router/src/db/merchant_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,19 +399,17 @@ async fn publish_and_redact_merchant_account_cache(
store: &dyn super::StorageInterface,
merchant_account: &storage::MerchantAccount,
) -> CustomResult<(), errors::StorageError> {
super::cache::publish_into_redact_channel(
store,
CacheKind::Accounts(merchant_account.merchant_id.as_str().into()),
)
.await?;
merchant_account
let publishable_key = merchant_account
.publishable_key
.as_ref()
.async_map(|pub_key| async {
super::cache::publish_into_redact_channel(store, CacheKind::Accounts(pub_key.into()))
.await
})
.await
.transpose()?;
.map(|publishable_key| CacheKind::Accounts(publishable_key.into()));

let mut cache_keys = vec![CacheKind::Accounts(
merchant_account.merchant_id.as_str().into(),
)];

cache_keys.extend(publishable_key.into_iter());

super::cache::publish_into_redact_channel(store, cache_keys).await?;
Ok(())
}
60 changes: 45 additions & 15 deletions crates/router/src/db/merchant_connector_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,21 +290,40 @@ impl MerchantConnectorAccountInterface for Store {
merchant_connector_id: &str,
key_store: &domain::MerchantKeyStore,
) -> CustomResult<domain::MerchantConnectorAccount, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,
merchant_connector_id,
)
.await
.map_err(Into::into)
.into_report()
.async_and_then(|item| async {
item.convert(key_store.key.get_inner())
let find_call = || async {
let conn = connection::pg_connection_read(self).await?;
storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,
merchant_connector_id,
)
.await
.map_err(Into::into)
.into_report()
};

#[cfg(not(feature = "accounts_cache"))]
{
find_call()
.await?
.convert(key_store.key.get_inner())
.await
.change_context(errors::StorageError::DecryptionError)
})
.await
}

#[cfg(feature = "accounts_cache")]
{
super::cache::get_or_populate_in_memory(
self,
&format!("{}_{}", merchant_id, merchant_connector_id),
find_call,
&cache::ACCOUNTS_CACHE,
)
.await?
.convert(key_store.key.get_inner())
.await
.change_context(errors::StorageError::DecryptionError)
}
}

async fn insert_merchant_connector_account(
Expand Down Expand Up @@ -367,6 +386,9 @@ impl MerchantConnectorAccountInterface for Store {
"profile_id".to_string(),
))?;

let _merchant_id = this.merchant_id.clone();
let _merchant_connector_id = this.merchant_connector_id.clone();

let update_call = || async {
let conn = connection::pg_connection_write(self).await?;
Conversion::convert(this)
Expand All @@ -386,9 +408,17 @@ impl MerchantConnectorAccountInterface for Store {

#[cfg(feature = "accounts_cache")]
{
super::cache::publish_and_redact(
// Redact both the caches as any one or both might be used because of backwards compatibility
super::cache::publish_and_redact_multiple(
self,
cache::CacheKind::Accounts(format!("{}_{}", _profile_id, _connector_name).into()),
[
cache::CacheKind::Accounts(
format!("{}_{}", _profile_id, _connector_name).into(),
),
cache::CacheKind::Accounts(
format!("{}_{}", _merchant_id, _merchant_connector_id).into(),
),
],
update_call,
)
.await
Expand Down

0 comments on commit 1ba6282

Please sign in to comment.