Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kv): add kv wrapper for executing kv tasks #2384

Merged
merged 5 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions crates/redis_interface/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ impl super::RedisConnectionPool {
&self,
key: &str,
values: V,
ttl: Option<u32>,
) -> CustomResult<(), errors::RedisError>
where
V: TryInto<RedisMap> + Debug + Send + Sync,
Expand All @@ -246,7 +247,9 @@ impl super::RedisConnectionPool {

// setting expiry for the key
output
.async_and_then(|_| self.set_expiry(key, self.config.default_hash_ttl.into()))
.async_and_then(|_| {
self.set_expiry(key, ttl.unwrap_or(self.config.default_hash_ttl).into())
})
.await
}

Expand All @@ -256,6 +259,7 @@ impl super::RedisConnectionPool {
key: &str,
field: &str,
value: V,
ttl: Option<u32>,
) -> CustomResult<HsetnxReply, errors::RedisError>
where
V: TryInto<RedisValue> + Debug + Send + Sync,
Expand All @@ -270,7 +274,7 @@ impl super::RedisConnectionPool {

output
.async_and_then(|inner| async {
self.set_expiry(key, self.config.default_hash_ttl.into())
self.set_expiry(key, ttl.unwrap_or(self.config.default_hash_ttl).into())
.await?;
Ok(inner)
})
Expand All @@ -283,14 +287,15 @@ impl super::RedisConnectionPool {
key: &str,
field: &str,
value: V,
ttl: Option<u32>,
) -> CustomResult<HsetnxReply, errors::RedisError>
where
V: serde::Serialize + Debug,
{
let serialized = Encode::<V>::encode_to_vec(&value)
.change_context(errors::RedisError::JsonSerializationFailed)?;

self.set_hash_field_if_not_exist(key, field, serialized.as_slice())
self.set_hash_field_if_not_exist(key, field, serialized.as_slice(), ttl)
.await
}

Expand Down Expand Up @@ -339,14 +344,15 @@ impl super::RedisConnectionPool {
&self,
kv: &[(&str, V)],
field: &str,
ttl: Option<u32>,
) -> CustomResult<Vec<HsetnxReply>, errors::RedisError>
where
V: serde::Serialize + Debug,
{
let mut hsetnx: Vec<HsetnxReply> = Vec::with_capacity(kv.len());
for (key, val) in kv {
hsetnx.push(
self.serialize_and_set_hash_field_if_not_exist(key, field, val)
self.serialize_and_set_hash_field_if_not_exist(key, field, val, ttl)
.await?,
);
}
Expand Down
2 changes: 2 additions & 0 deletions crates/redis_interface/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ pub enum RedisError {
PublishError,
#[error("Failed while receiving message from publisher")]
OnMessageError,
#[error("Got an unknown result from redis")]
UnknownResult,
}
3 changes: 3 additions & 0 deletions crates/router/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ pub(crate) const QR_IMAGE_DATA_SOURCE_STRING: &str = "data:image/png;base64";
pub(crate) const MERCHANT_ID_FIELD_EXTENSION_ID: &str = "1.2.840.113635.100.6.32";

pub(crate) const METRICS_HOST_TAG_NAME: &str = "host";

// TTL for KV setup
pub(crate) const KV_TTL: u32 = 300;
64 changes: 62 additions & 2 deletions crates/router/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ pub mod payouts;
pub mod refund;
pub mod reverse_lookup;

use std::fmt::Debug;

use data_models::payments::{
payment_attempt::PaymentAttemptInterface, payment_intent::PaymentIntentInterface,
};
use masking::PeekInterface;
use redis_interface::errors::RedisError;
use serde::de;
use storage_impl::{redis::kv_store::RedisConnInterface, MockDb};

use crate::services::Store;
use crate::{consts, errors::CustomResult, services::Store};

#[derive(PartialEq, Eq)]
pub enum StorageImpl {
Expand Down Expand Up @@ -115,7 +119,7 @@ pub async fn get_and_deserialize_key<T>(
db: &dyn StorageInterface,
key: &str,
type_name: &'static str,
) -> common_utils::errors::CustomResult<T, redis_interface::errors::RedisError>
) -> CustomResult<T, RedisError>
where
T: serde::de::DeserializeOwned,
{
Expand All @@ -128,4 +132,60 @@ where
.change_context(redis_interface::errors::RedisError::JsonDeserializationFailed)
}

pub enum KvOperation<'a, S: serde::Serialize + Debug> {
Set((&'a str, String)),
SetNx(&'a str, S),
Get(&'a str),
Scan(&'a str),
}

#[derive(router_derive::TryGetEnumVariant)]
#[error(RedisError(UnknownResult))]
pub enum KvResult<T: de::DeserializeOwned> {
Get(T),
Set(()),
SetNx(redis_interface::HsetnxReply),
Scan(Vec<T>),
}

pub async fn kv_wrapper<'a, T, S>(
store: &Store,
op: KvOperation<'a, S>,
key: impl AsRef<str>,
) -> CustomResult<KvResult<T>, RedisError>
where
T: de::DeserializeOwned,
S: serde::Serialize + Debug,
{
let redis_conn = store.get_redis_conn()?;

let key = key.as_ref();
let type_name = std::any::type_name::<T>();

match op {
KvOperation::Set(value) => {
redis_conn
.set_hash_fields(key, value, Some(consts::KV_TTL))
.await?;
Ok(KvResult::Set(()))
}
KvOperation::Get(field) => {
let result = redis_conn
.get_hash_field_and_deserialize(key, field, type_name)
.await?;
Ok(KvResult::Get(result))
}
KvOperation::Scan(pattern) => {
let result: Vec<T> = redis_conn.hscan_and_deserialize(key, pattern, None).await?;
Ok(KvResult::Scan(result))
}
KvOperation::SetNx(field, value) => {
let result = redis_conn
.serialize_and_set_hash_field_if_not_exist(key, field, value, Some(consts::KV_TTL))
.await?;
Ok(KvResult::SetNx(result))
}
}
}

dyn_clone::clone_trait_object!(StorageInterface);
28 changes: 19 additions & 9 deletions crates/router/src/db/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ mod storage {
use error_stack::{IntoReport, ResultExt};
use redis_interface::HsetnxReply;
use router_env::{instrument, tracing};
use storage_impl::redis::kv_store::{PartitionKey, RedisConnInterface};
use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey};

use super::AddressInterface;
use crate::{
Expand Down Expand Up @@ -307,9 +307,15 @@ mod storage {
let key = format!("{}_{}", merchant_id, payment_id);
let field = format!("add_{}", address_id);
db_utils::try_redis_get_else_try_database_get(
self.get_redis_conn()
.change_context(errors::StorageError::DatabaseConnectionError)?
.get_hash_field_and_deserialize(&key, &field, "Address"),
async {
kv_wrapper(
self,
KvOperation::<diesel_models::Address>::Get(&field),
key,
)
.await?
.try_into_get()
},
database_call,
)
.await
Expand Down Expand Up @@ -394,11 +400,15 @@ mod storage {
merchant_id: address_new.merchant_id.clone(),
payment_id: address_new.payment_id.clone(),
};
match self
.get_redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.serialize_and_set_hash_field_if_not_exist(&key, &field, &created_address)
.await

match kv_wrapper::<diesel_models::Address, _, _>(
self,
KvOperation::SetNx(&field, &created_address),
&key,
)
.await
.change_context(errors::StorageError::KVError)?
.try_into_setnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "address",
Expand Down
60 changes: 33 additions & 27 deletions crates/router/src/db/connector_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ mod storage {
use error_stack::{IntoReport, ResultExt};
use redis_interface::HsetnxReply;
use router_env::{instrument, tracing};
use storage_impl::redis::kv_store::{PartitionKey, RedisConnInterface};
use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey};

use super::Store;
use crate::{
Expand Down Expand Up @@ -148,15 +148,15 @@ mod storage {
authentication_data: connector_response.authentication_data.clone(),
encoded_data: connector_response.encoded_data.clone(),
};
match self
.get_redis_conn()
.map_err(|er| error_stack::report!(errors::StorageError::RedisError(er)))?
.serialize_and_set_hash_field_if_not_exist(
&key,
&field,
&created_connector_resp,
)
.await

match kv_wrapper::<storage_type::ConnectorResponse, _, _>(
self,
KvOperation::SetNx(&field, &created_connector_resp),
&key,
)
.await
.change_context(errors::StorageError::KVError)?
.try_into_setnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "address",
Expand Down Expand Up @@ -213,17 +213,20 @@ mod storage {
data_models::MerchantStorageScheme::RedisKv => {
let key = format!("{merchant_id}_{payment_id}");
let field = format!("connector_resp_{merchant_id}_{payment_id}_{attempt_id}");
let redis_conn = self
.get_redis_conn()
.map_err(|er| error_stack::report!(errors::StorageError::RedisError(er)))?;

let redis_fut = redis_conn.get_hash_field_and_deserialize(
&key,
&field,
"ConnectorResponse",
);

db_utils::try_redis_get_else_try_database_get(redis_fut, database_call).await
db_utils::try_redis_get_else_try_database_get(
async {
kv_wrapper(
self,
KvOperation::<diesel_models::Address>::Get(&field),
key,
)
.await?
.try_into_get()
},
database_call,
)
.await
}
}
}
Expand Down Expand Up @@ -255,13 +258,16 @@ mod storage {
&updated_connector_response.payment_id,
&updated_connector_response.attempt_id
);
let updated_connector_response = self
.get_redis_conn()
.map_err(|er| error_stack::report!(errors::StorageError::RedisError(er)))?
.set_hash_fields(&key, (&field, &redis_value))
.await
.map(|_| updated_connector_response)
.change_context(errors::StorageError::KVError)?;

kv_wrapper::<(), _, _>(
self,
KvOperation::Set::<storage_type::ConnectorResponse>((&field, redis_value)),
&key,
)
.await
.change_context(errors::StorageError::KVError)?
.try_into_set()
.change_context(errors::StorageError::KVError)?;

let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
Expand Down
1 change: 1 addition & 0 deletions crates/router/src/db/ephemeral_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ mod storage {
.serialize_and_set_multiple_hash_field_if_not_exist(
&[(&secret_key, &created_ek), (&id_key, &created_ek)],
"ephkey",
None,
)
.await
{
Expand Down
Loading