feat: kv for reverse lookup (#2445)

This commit is contained in:
Kartikeya Hegde
2023-10-10 14:35:00 +05:30
committed by GitHub
parent 4b0fa1295c
commit 13aaf96db0
16 changed files with 453 additions and 163 deletions

View File

@ -133,8 +133,9 @@ where
}
pub enum KvOperation<'a, S: serde::Serialize + Debug> {
Set((&'a str, String)),
SetNx(&'a str, S),
Hset((&'a str, String)),
SetNx(S),
HSetNx(&'a str, S),
Get(&'a str),
Scan(&'a str),
}
@ -143,8 +144,9 @@ pub enum KvOperation<'a, S: serde::Serialize + Debug> {
#[error(RedisError(UnknownResult))]
pub enum KvResult<T: de::DeserializeOwned> {
Get(T),
Set(()),
SetNx(redis_interface::HsetnxReply),
Hset(()),
SetNx(redis_interface::SetnxReply),
HSetNx(redis_interface::HsetnxReply),
Scan(Vec<T>),
}
@ -163,11 +165,11 @@ where
let type_name = std::any::type_name::<T>();
match op {
KvOperation::Set(value) => {
KvOperation::Hset(value) => {
redis_conn
.set_hash_fields(key, value, Some(consts::KV_TTL))
.await?;
Ok(KvResult::Set(()))
Ok(KvResult::Hset(()))
}
KvOperation::Get(field) => {
let result = redis_conn
@ -179,10 +181,16 @@ where
let result: Vec<T> = redis_conn.hscan_and_deserialize(key, pattern, None).await?;
Ok(KvResult::Scan(result))
}
KvOperation::SetNx(field, value) => {
KvOperation::HSetNx(field, value) => {
let result = redis_conn
.serialize_and_set_hash_field_if_not_exist(key, field, value, Some(consts::KV_TTL))
.await?;
Ok(KvResult::HSetNx(result))
}
KvOperation::SetNx(value) => {
let result = redis_conn
.serialize_and_set_key_if_not_exist(key, value, Some(consts::KV_TTL.into()))
.await?;
Ok(KvResult::SetNx(result))
}
}

View File

@ -304,17 +304,17 @@ mod storage {
let address = match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", merchant_id, payment_id);
let key = format!("mid_{}_pid_{}", merchant_id, payment_id);
let field = format!("add_{}", address_id);
db_utils::try_redis_get_else_try_database_get(
async {
kv_wrapper(
self,
KvOperation::<diesel_models::Address>::Get(&field),
KvOperation::<diesel_models::Address>::HGet(&field),
key,
)
.await?
.try_into_get()
.try_into_hget()
},
database_call,
)
@ -378,7 +378,7 @@ mod storage {
.await
}
MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", merchant_id, payment_id);
let key = format!("mid_{}_pid_{}", merchant_id, payment_id);
let field = format!("add_{}", &address_new.address_id);
let created_address = diesel_models::Address {
id: Some(0i32),
@ -403,12 +403,12 @@ mod storage {
match kv_wrapper::<diesel_models::Address, _, _>(
self,
KvOperation::SetNx(&field, &created_address),
KvOperation::HSetNx(&field, &created_address),
&key,
)
.await
.change_context(errors::StorageError::KVError)?
.try_into_setnx()
.try_into_hsetnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "address",

View File

@ -131,7 +131,7 @@ mod storage {
let payment_id = &connector_response.payment_id;
let attempt_id = &connector_response.attempt_id;
let key = format!("{merchant_id}_{payment_id}");
let key = format!("mid_{merchant_id}_pid_{payment_id}");
let field = format!("connector_resp_{merchant_id}_{payment_id}_{attempt_id}");
let created_connector_resp = storage_type::ConnectorResponse {
@ -151,12 +151,12 @@ mod storage {
match kv_wrapper::<storage_type::ConnectorResponse, _, _>(
self,
KvOperation::SetNx(&field, &created_connector_resp),
KvOperation::HSetNx(&field, &created_connector_resp),
&key,
)
.await
.change_context(errors::StorageError::KVError)?
.try_into_setnx()
.try_into_hsetnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "address",
@ -211,18 +211,18 @@ mod storage {
match storage_scheme {
data_models::MerchantStorageScheme::PostgresOnly => database_call().await,
data_models::MerchantStorageScheme::RedisKv => {
let key = format!("{merchant_id}_{payment_id}");
let key = format!("mid_{merchant_id}_pid_{payment_id}");
let field = format!("connector_resp_{merchant_id}_{payment_id}_{attempt_id}");
db_utils::try_redis_get_else_try_database_get(
async {
kv_wrapper(
self,
KvOperation::<diesel_models::Address>::Get(&field),
KvOperation::<diesel_models::Address>::HGet(&field),
key,
)
.await?
.try_into_get()
.try_into_hget()
},
database_call,
)
@ -245,7 +245,7 @@ mod storage {
.map_err(Into::into)
.into_report(),
data_models::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", this.merchant_id, this.payment_id);
let key = format!("mid_{}_pid_{}", this.merchant_id, this.payment_id);
let updated_connector_response = connector_response_update
.clone()
.apply_changeset(this.clone());
@ -261,12 +261,12 @@ mod storage {
kv_wrapper::<(), _, _>(
self,
KvOperation::Set::<storage_type::ConnectorResponse>((&field, redis_value)),
KvOperation::Hset::<storage_type::ConnectorResponse>((&field, redis_value)),
&key,
)
.await
.change_context(errors::StorageError::KVError)?
.try_into_set()
.try_into_hset()
.change_context(errors::StorageError::KVError)?;
let redis_entry = kv::TypedSql {

View File

@ -305,18 +305,20 @@ mod storage {
enums::MerchantStorageScheme::PostgresOnly => database_call().await,
enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{merchant_id}_{internal_reference_id}");
let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?;
let lookup = self
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await?;
let key = &lookup.pk_id;
db_utils::try_redis_get_else_try_database_get(
async {
kv_wrapper(
self,
KvOperation::<storage_types::Refund>::Get(&lookup.sk_id),
KvOperation::<storage_types::Refund>::HGet(&lookup.sk_id),
key,
)
.await?
.try_into_get()
.try_into_hget()
},
database_call,
)
@ -336,7 +338,7 @@ mod storage {
new.insert(&conn).await.map_err(Into::into).into_report()
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", new.merchant_id, new.payment_id);
let key = format!("mid_{}_pid_{}", new.merchant_id, new.payment_id);
// TODO: need to add an application generated payment attempt id to distinguish between multiple attempts for the same payment id
// Check for database presence as well Maybe use a read replica here ?
let created_refund = storage_types::Refund {
@ -373,12 +375,12 @@ mod storage {
);
match kv_wrapper::<storage_types::Refund, _, _>(
self,
KvOperation::SetNx(&field, &created_refund),
KvOperation::HSetNx(&field, &created_refund),
&key,
)
.await
.change_context(errors::StorageError::KVError)?
.try_into_setnx()
.try_into_hsetnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "refund",
@ -386,8 +388,6 @@ mod storage {
})
.into_report(),
Ok(HsetnxReply::KeySet) => {
let conn = connection::pg_connection_write(self).await?;
let mut reverse_lookups = vec![
storage_types::ReverseLookupNew {
sk_id: field.clone(),
@ -425,9 +425,11 @@ mod storage {
source: "refund".to_string(),
})
};
storage_types::ReverseLookupNew::batch_insert(reverse_lookups, &conn)
.await
.change_context(errors::StorageError::KVError)?;
let rev_look = reverse_lookups
.into_iter()
.map(|rev| self.insert_reverse_lookup(rev, storage_scheme));
futures::future::try_join_all(rev_look).await?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
@ -473,7 +475,10 @@ mod storage {
enums::MerchantStorageScheme::PostgresOnly => database_call().await,
enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{merchant_id}_{connector_transaction_id}");
let lookup = match self.get_lookup_by_lookup_id(&lookup_id).await {
let lookup = match self
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await
{
Ok(l) => l,
Err(err) => {
logger::error!(?err);
@ -516,7 +521,7 @@ mod storage {
.into_report()
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", this.merchant_id, this.payment_id);
let key = format!("mid_{}_pid_{}", this.merchant_id, this.payment_id);
let field = format!("pa_{}_ref_{}", &this.attempt_id, &this.refund_id);
let updated_refund = refund.clone().apply_changeset(this.clone());
@ -528,12 +533,12 @@ mod storage {
kv_wrapper::<(), _, _>(
self,
KvOperation::Set::<storage_types::Refund>((&field, redis_value)),
KvOperation::Hset::<storage_types::Refund>((&field, redis_value)),
&key,
)
.await
.change_context(errors::StorageError::KVError)?
.try_into_set()
.try_into_hset()
.change_context(errors::StorageError::KVError)?;
let redis_entry = kv::TypedSql {
@ -575,18 +580,20 @@ mod storage {
enums::MerchantStorageScheme::PostgresOnly => database_call().await,
enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{merchant_id}_{refund_id}");
let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?;
let lookup = self
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await?;
let key = &lookup.pk_id;
db_utils::try_redis_get_else_try_database_get(
async {
kv_wrapper(
self,
KvOperation::<storage_types::Refund>::Get(&lookup.sk_id),
KvOperation::<storage_types::Refund>::HGet(&lookup.sk_id),
key,
)
.await?
.try_into_get()
.try_into_hget()
},
database_call,
)
@ -618,18 +625,20 @@ mod storage {
enums::MerchantStorageScheme::PostgresOnly => database_call().await,
enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{merchant_id}_{connector_refund_id}_{connector}");
let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?;
let lookup = self
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await?;
let key = &lookup.pk_id;
db_utils::try_redis_get_else_try_database_get(
async {
kv_wrapper(
self,
KvOperation::<storage_types::Refund>::Get(&lookup.sk_id),
KvOperation::<storage_types::Refund>::HGet(&lookup.sk_id),
key,
)
.await?
.try_into_get()
.try_into_hget()
},
database_call,
)
@ -658,7 +667,7 @@ mod storage {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => database_call().await,
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{merchant_id}_{payment_id}");
let key = format!("mid_{merchant_id}_pid_{payment_id}");
db_utils::try_redis_get_else_try_database_get(
async {
kv_wrapper(

View File

@ -1,10 +1,10 @@
use error_stack::IntoReport;
use super::{cache, MockDb, Store};
use super::{MockDb, Store};
use crate::{
connection,
errors::{self, CustomResult},
types::storage::reverse_lookup::{ReverseLookup, ReverseLookupNew},
types::storage::{
enums,
reverse_lookup::{ReverseLookup, ReverseLookupNew},
},
};
#[async_trait::async_trait]
@ -12,35 +12,156 @@ pub trait ReverseLookupInterface {
async fn insert_reverse_lookup(
&self,
_new: ReverseLookupNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ReverseLookup, errors::StorageError>;
async fn get_lookup_by_lookup_id(
&self,
_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ReverseLookup, errors::StorageError>;
}
#[async_trait::async_trait]
impl ReverseLookupInterface for Store {
async fn insert_reverse_lookup(
&self,
new: ReverseLookupNew,
) -> CustomResult<ReverseLookup, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
#[cfg(not(feature = "kv_store"))]
mod storage {
use error_stack::IntoReport;
async fn get_lookup_by_lookup_id(
&self,
id: &str,
) -> CustomResult<ReverseLookup, errors::StorageError> {
let database_call = || async {
use super::{ReverseLookupInterface, Store};
use crate::{
connection,
errors::{self, CustomResult},
types::storage::{
enums,
reverse_lookup::{ReverseLookup, ReverseLookupNew},
},
};
#[async_trait::async_trait]
impl ReverseLookupInterface for Store {
async fn insert_reverse_lookup(
&self,
new: ReverseLookupNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ReverseLookup, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
async fn get_lookup_by_lookup_id(
&self,
id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ReverseLookup, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
ReverseLookup::find_by_lookup_id(id, &conn)
.await
.map_err(Into::into)
.into_report()
};
cache::get_or_populate_redis(self, format!("reverse_lookup_{id}"), database_call).await
}
}
}
#[cfg(feature = "kv_store")]
mod storage {
use error_stack::{IntoReport, ResultExt};
use redis_interface::SetnxReply;
use storage_impl::redis::kv_store::{kv_wrapper, KvOperation};
use super::{ReverseLookupInterface, Store};
use crate::{
connection,
errors::{self, CustomResult},
types::storage::{
enums, kv,
reverse_lookup::{ReverseLookup, ReverseLookupNew},
},
utils::{db_utils, storage_partitioning::PartitionKey},
};
#[async_trait::async_trait]
impl ReverseLookupInterface for Store {
async fn insert_reverse_lookup(
&self,
new: ReverseLookupNew,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ReverseLookup, errors::StorageError> {
match storage_scheme {
data_models::MerchantStorageScheme::PostgresOnly => {
let conn = connection::pg_connection_write(self).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
data_models::MerchantStorageScheme::RedisKv => {
let created_rev_lookup = ReverseLookup {
lookup_id: new.lookup_id.clone(),
sk_id: new.sk_id.clone(),
pk_id: new.pk_id.clone(),
source: new.source.clone(),
};
let combination = &created_rev_lookup.pk_id;
match kv_wrapper::<ReverseLookup, _, _>(
self,
KvOperation::SetNx(&created_rev_lookup),
format!("reverse_lookup_{}", &created_rev_lookup.lookup_id),
)
.await
.change_context(errors::StorageError::KVError)?
.try_into_setnx()
{
Ok(SetnxReply::KeySet) => {
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::ReverseLookUp(new),
},
};
self.push_to_drainer_stream::<ReverseLookup>(
redis_entry,
PartitionKey::MerchantIdPaymentIdCombination { combination },
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(created_rev_lookup)
}
Ok(SetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "reverse_lookup",
key: Some(created_rev_lookup.lookup_id.clone()),
})
.into_report(),
Err(er) => Err(er).change_context(errors::StorageError::KVError),
}
}
}
}
async fn get_lookup_by_lookup_id(
&self,
id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ReverseLookup, errors::StorageError> {
let database_call = || async {
let conn = connection::pg_connection_read(self).await?;
ReverseLookup::find_by_lookup_id(id, &conn)
.await
.map_err(Into::into)
.into_report()
};
match storage_scheme {
data_models::MerchantStorageScheme::PostgresOnly => database_call().await,
data_models::MerchantStorageScheme::RedisKv => {
let redis_fut = async {
kv_wrapper(
self,
KvOperation::<ReverseLookup>::Get,
format!("reverse_lookup_{id}"),
)
.await?
.try_into_get()
};
db_utils::try_redis_get_else_try_database_get(redis_fut, database_call).await
}
}
}
}
}
@ -49,6 +170,7 @@ impl ReverseLookupInterface for MockDb {
async fn insert_reverse_lookup(
&self,
new: ReverseLookupNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ReverseLookup, errors::StorageError> {
let reverse_lookup_insert = ReverseLookup::from(new);
self.reverse_lookups
@ -57,9 +179,11 @@ impl ReverseLookupInterface for MockDb {
.push(reverse_lookup_insert.clone());
Ok(reverse_lookup_insert)
}
async fn get_lookup_by_lookup_id(
&self,
lookup_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ReverseLookup, errors::StorageError> {
self.reverse_lookups
.lock()