diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index 5804107d65..93d045bd71 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -8,6 +8,7 @@ use crate::{ payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate}, payment_intent::{PaymentIntent, PaymentIntentNew, PaymentIntentUpdate}, refund::{Refund, RefundNew, RefundUpdate}, + reverse_lookup::ReverseLookupNew, }; #[derive(Debug, Serialize, Deserialize)] @@ -43,6 +44,7 @@ pub enum Insertable { Refund(RefundNew), ConnectorResponse(ConnectorResponseNew), Address(Box), + ReverseLookUp(ReverseLookupNew), } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/diesel_models/src/reverse_lookup.rs b/crates/diesel_models/src/reverse_lookup.rs index a72b23accc..6483b7c78c 100644 --- a/crates/diesel_models/src/reverse_lookup.rs +++ b/crates/diesel_models/src/reverse_lookup.rs @@ -22,7 +22,14 @@ pub struct ReverseLookup { } #[derive( - Clone, Debug, Insertable, router_derive::DebugAsDisplay, Eq, PartialEq, serde::Serialize, + Clone, + Debug, + Insertable, + router_derive::DebugAsDisplay, + Eq, + PartialEq, + serde::Serialize, + serde::Deserialize, )] #[diesel(table_name = reverse_lookup)] pub struct ReverseLookupNew { diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index 7f431c5d25..e09c565405 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -188,6 +188,7 @@ async fn drainer( let payment_intent = "payment_intent"; let payment_attempt = "payment_attempt"; let refund = "refund"; + let reverse_lookup = "reverse_lookup"; let connector_response = "connector_response"; let address = "address"; match db_op { @@ -222,6 +223,13 @@ async fn drainer( kv::Insertable::Address(addr) => { macro_util::handle_resp!(addr.insert(&conn).await, insert_op, address) } + kv::Insertable::ReverseLookUp(rev) => { + macro_util::handle_resp!( + rev.insert(&conn).await, + insert_op, + reverse_lookup + ) + } } }) .await; diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index c44b14fee2..d53fd1625f 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -65,6 +65,22 @@ impl super::RedisConnectionPool { .change_context(errors::RedisError::SetFailed) } + #[instrument(level = "DEBUG", skip(self))] + pub async fn serialize_and_set_key_if_not_exist( + &self, + key: &str, + value: V, + ttl: Option, + ) -> CustomResult + where + V: serde::Serialize + Debug, + { + let serialized = Encode::::encode_to_vec(&value) + .change_context(errors::RedisError::JsonSerializationFailed)?; + self.set_key_if_not_exists_with_expiry(key, serialized.as_slice(), ttl) + .await + } + #[instrument(level = "DEBUG", skip(self))] pub async fn serialize_and_set_key( &self, diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index 9356444c4e..356c1c6a51 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -133,8 +133,9 @@ where } pub enum KvOperation<'a, S: serde::Serialize + Debug> { - Set((&'a str, String)), - SetNx(&'a str, S), + Hset((&'a str, String)), + SetNx(S), + HSetNx(&'a str, S), Get(&'a str), Scan(&'a str), } @@ -143,8 +144,9 @@ pub enum KvOperation<'a, S: serde::Serialize + Debug> { #[error(RedisError(UnknownResult))] pub enum KvResult { Get(T), - Set(()), - SetNx(redis_interface::HsetnxReply), + Hset(()), + SetNx(redis_interface::SetnxReply), + HSetNx(redis_interface::HsetnxReply), Scan(Vec), } @@ -163,11 +165,11 @@ where let type_name = std::any::type_name::(); match op { - KvOperation::Set(value) => { + KvOperation::Hset(value) => { redis_conn .set_hash_fields(key, value, Some(consts::KV_TTL)) .await?; - Ok(KvResult::Set(())) + Ok(KvResult::Hset(())) } KvOperation::Get(field) => { let result = redis_conn @@ -179,10 +181,16 @@ where let result: Vec = redis_conn.hscan_and_deserialize(key, pattern, None).await?; Ok(KvResult::Scan(result)) } - KvOperation::SetNx(field, value) => { + KvOperation::HSetNx(field, value) => { let result = redis_conn .serialize_and_set_hash_field_if_not_exist(key, field, value, Some(consts::KV_TTL)) .await?; + Ok(KvResult::HSetNx(result)) + } + KvOperation::SetNx(value) => { + let result = redis_conn + .serialize_and_set_key_if_not_exist(key, value, Some(consts::KV_TTL.into())) + .await?; Ok(KvResult::SetNx(result)) } } diff --git a/crates/router/src/db/address.rs b/crates/router/src/db/address.rs index 781df79713..dda6838c97 100644 --- a/crates/router/src/db/address.rs +++ b/crates/router/src/db/address.rs @@ -304,17 +304,17 @@ mod storage { let address = match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", merchant_id, payment_id); + let key = format!("mid_{}_pid_{}", merchant_id, payment_id); let field = format!("add_{}", address_id); db_utils::try_redis_get_else_try_database_get( async { kv_wrapper( self, - KvOperation::::Get(&field), + KvOperation::::HGet(&field), key, ) .await? - .try_into_get() + .try_into_hget() }, database_call, ) @@ -378,7 +378,7 @@ mod storage { .await } MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", merchant_id, payment_id); + let key = format!("mid_{}_pid_{}", merchant_id, payment_id); let field = format!("add_{}", &address_new.address_id); let created_address = diesel_models::Address { id: Some(0i32), @@ -403,12 +403,12 @@ mod storage { match kv_wrapper::( self, - KvOperation::SetNx(&field, &created_address), + KvOperation::HSetNx(&field, &created_address), &key, ) .await .change_context(errors::StorageError::KVError)? - .try_into_setnx() + .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { entity: "address", diff --git a/crates/router/src/db/connector_response.rs b/crates/router/src/db/connector_response.rs index dd08f123c4..c00eccb4d9 100644 --- a/crates/router/src/db/connector_response.rs +++ b/crates/router/src/db/connector_response.rs @@ -131,7 +131,7 @@ mod storage { let payment_id = &connector_response.payment_id; let attempt_id = &connector_response.attempt_id; - let key = format!("{merchant_id}_{payment_id}"); + let key = format!("mid_{merchant_id}_pid_{payment_id}"); let field = format!("connector_resp_{merchant_id}_{payment_id}_{attempt_id}"); let created_connector_resp = storage_type::ConnectorResponse { @@ -151,12 +151,12 @@ mod storage { match kv_wrapper::( self, - KvOperation::SetNx(&field, &created_connector_resp), + KvOperation::HSetNx(&field, &created_connector_resp), &key, ) .await .change_context(errors::StorageError::KVError)? - .try_into_setnx() + .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { entity: "address", @@ -211,18 +211,18 @@ mod storage { match storage_scheme { data_models::MerchantStorageScheme::PostgresOnly => database_call().await, data_models::MerchantStorageScheme::RedisKv => { - let key = format!("{merchant_id}_{payment_id}"); + let key = format!("mid_{merchant_id}_pid_{payment_id}"); let field = format!("connector_resp_{merchant_id}_{payment_id}_{attempt_id}"); db_utils::try_redis_get_else_try_database_get( async { kv_wrapper( self, - KvOperation::::Get(&field), + KvOperation::::HGet(&field), key, ) .await? - .try_into_get() + .try_into_hget() }, database_call, ) @@ -245,7 +245,7 @@ mod storage { .map_err(Into::into) .into_report(), data_models::MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", this.merchant_id, this.payment_id); + let key = format!("mid_{}_pid_{}", this.merchant_id, this.payment_id); let updated_connector_response = connector_response_update .clone() .apply_changeset(this.clone()); @@ -261,12 +261,12 @@ mod storage { kv_wrapper::<(), _, _>( self, - KvOperation::Set::((&field, redis_value)), + KvOperation::Hset::((&field, redis_value)), &key, ) .await .change_context(errors::StorageError::KVError)? - .try_into_set() + .try_into_hset() .change_context(errors::StorageError::KVError)?; let redis_entry = kv::TypedSql { diff --git a/crates/router/src/db/refund.rs b/crates/router/src/db/refund.rs index ddc3bcf416..dbf4941e08 100644 --- a/crates/router/src/db/refund.rs +++ b/crates/router/src/db/refund.rs @@ -305,18 +305,20 @@ mod storage { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { let lookup_id = format!("{merchant_id}_{internal_reference_id}"); - let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?; + let lookup = self + .get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await?; let key = &lookup.pk_id; db_utils::try_redis_get_else_try_database_get( async { kv_wrapper( self, - KvOperation::::Get(&lookup.sk_id), + KvOperation::::HGet(&lookup.sk_id), key, ) .await? - .try_into_get() + .try_into_hget() }, database_call, ) @@ -336,7 +338,7 @@ mod storage { new.insert(&conn).await.map_err(Into::into).into_report() } enums::MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", new.merchant_id, new.payment_id); + let key = format!("mid_{}_pid_{}", new.merchant_id, new.payment_id); // TODO: need to add an application generated payment attempt id to distinguish between multiple attempts for the same payment id // Check for database presence as well Maybe use a read replica here ? let created_refund = storage_types::Refund { @@ -373,12 +375,12 @@ mod storage { ); match kv_wrapper::( self, - KvOperation::SetNx(&field, &created_refund), + KvOperation::HSetNx(&field, &created_refund), &key, ) .await .change_context(errors::StorageError::KVError)? - .try_into_setnx() + .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { entity: "refund", @@ -386,8 +388,6 @@ mod storage { }) .into_report(), Ok(HsetnxReply::KeySet) => { - let conn = connection::pg_connection_write(self).await?; - let mut reverse_lookups = vec![ storage_types::ReverseLookupNew { sk_id: field.clone(), @@ -425,9 +425,11 @@ mod storage { source: "refund".to_string(), }) }; - storage_types::ReverseLookupNew::batch_insert(reverse_lookups, &conn) - .await - .change_context(errors::StorageError::KVError)?; + let rev_look = reverse_lookups + .into_iter() + .map(|rev| self.insert_reverse_lookup(rev, storage_scheme)); + + futures::future::try_join_all(rev_look).await?; let redis_entry = kv::TypedSql { op: kv::DBOperation::Insert { @@ -473,7 +475,10 @@ mod storage { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { let lookup_id = format!("{merchant_id}_{connector_transaction_id}"); - let lookup = match self.get_lookup_by_lookup_id(&lookup_id).await { + let lookup = match self + .get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await + { Ok(l) => l, Err(err) => { logger::error!(?err); @@ -516,7 +521,7 @@ mod storage { .into_report() } enums::MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", this.merchant_id, this.payment_id); + let key = format!("mid_{}_pid_{}", this.merchant_id, this.payment_id); let field = format!("pa_{}_ref_{}", &this.attempt_id, &this.refund_id); let updated_refund = refund.clone().apply_changeset(this.clone()); @@ -528,12 +533,12 @@ mod storage { kv_wrapper::<(), _, _>( self, - KvOperation::Set::((&field, redis_value)), + KvOperation::Hset::((&field, redis_value)), &key, ) .await .change_context(errors::StorageError::KVError)? - .try_into_set() + .try_into_hset() .change_context(errors::StorageError::KVError)?; let redis_entry = kv::TypedSql { @@ -575,18 +580,20 @@ mod storage { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { let lookup_id = format!("{merchant_id}_{refund_id}"); - let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?; + let lookup = self + .get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await?; let key = &lookup.pk_id; db_utils::try_redis_get_else_try_database_get( async { kv_wrapper( self, - KvOperation::::Get(&lookup.sk_id), + KvOperation::::HGet(&lookup.sk_id), key, ) .await? - .try_into_get() + .try_into_hget() }, database_call, ) @@ -618,18 +625,20 @@ mod storage { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { let lookup_id = format!("{merchant_id}_{connector_refund_id}_{connector}"); - let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?; + let lookup = self + .get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await?; let key = &lookup.pk_id; db_utils::try_redis_get_else_try_database_get( async { kv_wrapper( self, - KvOperation::::Get(&lookup.sk_id), + KvOperation::::HGet(&lookup.sk_id), key, ) .await? - .try_into_get() + .try_into_hget() }, database_call, ) @@ -658,7 +667,7 @@ mod storage { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { - let key = format!("{merchant_id}_{payment_id}"); + let key = format!("mid_{merchant_id}_pid_{payment_id}"); db_utils::try_redis_get_else_try_database_get( async { kv_wrapper( diff --git a/crates/router/src/db/reverse_lookup.rs b/crates/router/src/db/reverse_lookup.rs index 46e89f62e7..7e660224d5 100644 --- a/crates/router/src/db/reverse_lookup.rs +++ b/crates/router/src/db/reverse_lookup.rs @@ -1,10 +1,10 @@ -use error_stack::IntoReport; - -use super::{cache, MockDb, Store}; +use super::{MockDb, Store}; use crate::{ - connection, errors::{self, CustomResult}, - types::storage::reverse_lookup::{ReverseLookup, ReverseLookupNew}, + types::storage::{ + enums, + reverse_lookup::{ReverseLookup, ReverseLookupNew}, + }, }; #[async_trait::async_trait] @@ -12,35 +12,156 @@ pub trait ReverseLookupInterface { async fn insert_reverse_lookup( &self, _new: ReverseLookupNew, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn get_lookup_by_lookup_id( &self, _id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; } -#[async_trait::async_trait] -impl ReverseLookupInterface for Store { - async fn insert_reverse_lookup( - &self, - new: ReverseLookupNew, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - new.insert(&conn).await.map_err(Into::into).into_report() - } +#[cfg(not(feature = "kv_store"))] +mod storage { + use error_stack::IntoReport; - async fn get_lookup_by_lookup_id( - &self, - id: &str, - ) -> CustomResult { - let database_call = || async { + use super::{ReverseLookupInterface, Store}; + use crate::{ + connection, + errors::{self, CustomResult}, + types::storage::{ + enums, + reverse_lookup::{ReverseLookup, ReverseLookupNew}, + }, + }; + + #[async_trait::async_trait] + impl ReverseLookupInterface for Store { + async fn insert_reverse_lookup( + &self, + new: ReverseLookupNew, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + new.insert(&conn).await.map_err(Into::into).into_report() + } + + async fn get_lookup_by_lookup_id( + &self, + id: &str, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { let conn = connection::pg_connection_read(self).await?; ReverseLookup::find_by_lookup_id(id, &conn) .await .map_err(Into::into) .into_report() - }; - cache::get_or_populate_redis(self, format!("reverse_lookup_{id}"), database_call).await + } + } +} + +#[cfg(feature = "kv_store")] +mod storage { + use error_stack::{IntoReport, ResultExt}; + use redis_interface::SetnxReply; + use storage_impl::redis::kv_store::{kv_wrapper, KvOperation}; + + use super::{ReverseLookupInterface, Store}; + use crate::{ + connection, + errors::{self, CustomResult}, + types::storage::{ + enums, kv, + reverse_lookup::{ReverseLookup, ReverseLookupNew}, + }, + utils::{db_utils, storage_partitioning::PartitionKey}, + }; + + #[async_trait::async_trait] + impl ReverseLookupInterface for Store { + async fn insert_reverse_lookup( + &self, + new: ReverseLookupNew, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + match storage_scheme { + data_models::MerchantStorageScheme::PostgresOnly => { + let conn = connection::pg_connection_write(self).await?; + new.insert(&conn).await.map_err(Into::into).into_report() + } + data_models::MerchantStorageScheme::RedisKv => { + let created_rev_lookup = ReverseLookup { + lookup_id: new.lookup_id.clone(), + sk_id: new.sk_id.clone(), + pk_id: new.pk_id.clone(), + source: new.source.clone(), + }; + let combination = &created_rev_lookup.pk_id; + match kv_wrapper::( + self, + KvOperation::SetNx(&created_rev_lookup), + format!("reverse_lookup_{}", &created_rev_lookup.lookup_id), + ) + .await + .change_context(errors::StorageError::KVError)? + .try_into_setnx() + { + Ok(SetnxReply::KeySet) => { + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: kv::Insertable::ReverseLookUp(new), + }, + }; + self.push_to_drainer_stream::( + redis_entry, + PartitionKey::MerchantIdPaymentIdCombination { combination }, + ) + .await + .change_context(errors::StorageError::KVError)?; + + Ok(created_rev_lookup) + } + Ok(SetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { + entity: "reverse_lookup", + key: Some(created_rev_lookup.lookup_id.clone()), + }) + .into_report(), + Err(er) => Err(er).change_context(errors::StorageError::KVError), + } + } + } + } + + async fn get_lookup_by_lookup_id( + &self, + id: &str, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let database_call = || async { + let conn = connection::pg_connection_read(self).await?; + ReverseLookup::find_by_lookup_id(id, &conn) + .await + .map_err(Into::into) + .into_report() + }; + + match storage_scheme { + data_models::MerchantStorageScheme::PostgresOnly => database_call().await, + data_models::MerchantStorageScheme::RedisKv => { + let redis_fut = async { + kv_wrapper( + self, + KvOperation::::Get, + format!("reverse_lookup_{id}"), + ) + .await? + .try_into_get() + }; + + db_utils::try_redis_get_else_try_database_get(redis_fut, database_call).await + } + } + } } } @@ -49,6 +170,7 @@ impl ReverseLookupInterface for MockDb { async fn insert_reverse_lookup( &self, new: ReverseLookupNew, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let reverse_lookup_insert = ReverseLookup::from(new); self.reverse_lookups @@ -57,9 +179,11 @@ impl ReverseLookupInterface for MockDb { .push(reverse_lookup_insert.clone()); Ok(reverse_lookup_insert) } + async fn get_lookup_by_lookup_id( &self, lookup_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { self.reverse_lookups .lock() diff --git a/crates/router_derive/src/lib.rs b/crates/router_derive/src/lib.rs index 0a7ec822e9..3f34c156ae 100644 --- a/crates/router_derive/src/lib.rs +++ b/crates/router_derive/src/lib.rs @@ -544,7 +544,7 @@ pub fn validate_config(input: proc_macro::TokenStream) -> proc_macro::TokenStrea /// ``` /// #[derive(TryGetEnumVariant)] /// #[error(RedisError(UnknownResult))] -/// struct Result { +/// enum Result { /// Set(String), /// Get(i32) /// } @@ -564,7 +564,7 @@ pub fn validate_config(input: proc_macro::TokenStream) -> proc_macro::TokenStrea /// fn try_into_set(&self)-> Result { /// match self { /// Self::Set(a) => Ok(a), -/// _=>Err(RedisError::UnknownResult) +/// _=> Err(RedisError::UnknownResult) /// } /// } /// } diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index bcc5c79257..58f4d4956f 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -18,6 +18,7 @@ pub mod mock_db; pub mod payments; pub mod redis; pub mod refund; +mod reverse_lookup; mod utils; use database::store::PgPool; diff --git a/crates/storage_impl/src/lookup.rs b/crates/storage_impl/src/lookup.rs index 7c425c7806..ae4d1f7339 100644 --- a/crates/storage_impl/src/lookup.rs +++ b/crates/storage_impl/src/lookup.rs @@ -1,21 +1,32 @@ use common_utils::errors::CustomResult; use data_models::errors; -use diesel_models::reverse_lookup::{ - ReverseLookup as DieselReverseLookup, ReverseLookupNew as DieselReverseLookupNew, +use diesel_models::{ + kv, + reverse_lookup::{ + ReverseLookup as DieselReverseLookup, ReverseLookupNew as DieselReverseLookupNew, + }, }; use error_stack::{IntoReport, ResultExt}; +use redis_interface::SetnxReply; -use crate::{redis::cache::get_or_populate_redis, DatabaseStore, KVRouterStore, RouterStore}; +use crate::{ + diesel_error_to_data_error, + redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, + utils::{self, try_redis_get_else_try_database_get}, + DatabaseStore, KVRouterStore, RouterStore, +}; #[async_trait::async_trait] pub trait ReverseLookupInterface { async fn insert_reverse_lookup( &self, _new: DieselReverseLookupNew, + storage_scheme: data_models::MerchantStorageScheme, ) -> CustomResult; async fn get_lookup_by_lookup_id( &self, _id: &str, + storage_scheme: data_models::MerchantStorageScheme, ) -> CustomResult; } @@ -24,6 +35,7 @@ impl ReverseLookupInterface for RouterStore { async fn insert_reverse_lookup( &self, new: DieselReverseLookupNew, + _storage_scheme: data_models::MerchantStorageScheme, ) -> CustomResult { let conn = self .get_master_pool() @@ -32,7 +44,7 @@ impl ReverseLookupInterface for RouterStore { .into_report() .change_context(errors::StorageError::DatabaseConnectionError)?; new.insert(&conn).await.map_err(|er| { - let new_err = crate::diesel_error_to_data_error(er.current_context()); + let new_err = diesel_error_to_data_error(er.current_context()); er.change_context(new_err) }) } @@ -40,17 +52,15 @@ impl ReverseLookupInterface for RouterStore { async fn get_lookup_by_lookup_id( &self, id: &str, + _storage_scheme: data_models::MerchantStorageScheme, ) -> CustomResult { - let database_call = || async { - let conn = crate::utils::pg_connection_read(self).await?; - DieselReverseLookup::find_by_lookup_id(id, &conn) - .await - .map_err(|er| { - let new_err = crate::diesel_error_to_data_error(er.current_context()); - er.change_context(new_err) - }) - }; - get_or_populate_redis(self, format!("reverse_lookup_{id}"), database_call).await + let conn = utils::pg_connection_read(self).await?; + DieselReverseLookup::find_by_lookup_id(id, &conn) + .await + .map_err(|er| { + let new_err = diesel_error_to_data_error(er.current_context()); + er.change_context(new_err) + }) } } @@ -59,14 +69,82 @@ impl ReverseLookupInterface for KVRouterStore { async fn insert_reverse_lookup( &self, new: DieselReverseLookupNew, + storage_scheme: data_models::MerchantStorageScheme, ) -> CustomResult { - self.router_store.insert_reverse_lookup(new).await + match storage_scheme { + data_models::MerchantStorageScheme::PostgresOnly => { + self.router_store + .insert_reverse_lookup(new, storage_scheme) + .await + } + data_models::MerchantStorageScheme::RedisKv => { + let created_rev_lookup = DieselReverseLookup { + lookup_id: new.lookup_id.clone(), + sk_id: new.sk_id.clone(), + pk_id: new.pk_id.clone(), + source: new.source.clone(), + }; + let combination = &created_rev_lookup.pk_id; + match kv_wrapper::( + self, + KvOperation::SetNx(&created_rev_lookup), + format!("reverse_lookup_{}", &created_rev_lookup.lookup_id), + ) + .await + .change_context(errors::StorageError::KVError)? + .try_into_setnx() + { + Ok(SetnxReply::KeySet) => { + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: kv::Insertable::ReverseLookUp(new), + }, + }; + self.push_to_drainer_stream::( + redis_entry, + PartitionKey::MerchantIdPaymentIdCombination { combination }, + ) + .await + .change_context(errors::StorageError::KVError)?; + + Ok(created_rev_lookup) + } + Ok(SetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { + entity: "reverse_lookup", + key: Some(created_rev_lookup.lookup_id.clone()), + }) + .into_report(), + Err(er) => Err(er).change_context(errors::StorageError::KVError), + } + } + } } async fn get_lookup_by_lookup_id( &self, id: &str, + storage_scheme: data_models::MerchantStorageScheme, ) -> CustomResult { - self.router_store.get_lookup_by_lookup_id(id).await + let database_call = || async { + self.router_store + .get_lookup_by_lookup_id(id, storage_scheme) + .await + }; + match storage_scheme { + data_models::MerchantStorageScheme::PostgresOnly => database_call().await, + data_models::MerchantStorageScheme::RedisKv => { + let redis_fut = async { + kv_wrapper( + self, + KvOperation::::Get, + format!("reverse_lookup_{id}"), + ) + .await? + .try_into_get() + }; + + try_redis_get_else_try_database_get(redis_fut, database_call).await + } + } } } diff --git a/crates/storage_impl/src/payments/payment_attempt.rs b/crates/storage_impl/src/payments/payment_attempt.rs index 4911fdfdc3..4764ce68a3 100644 --- a/crates/storage_impl/src/payments/payment_attempt.rs +++ b/crates/storage_impl/src/payments/payment_attempt.rs @@ -308,7 +308,7 @@ impl PaymentAttemptInterface for KVRouterStore { } MerchantStorageScheme::RedisKv => { let key = format!( - "{}_{}", + "mid_{}_pid_{}", payment_attempt.merchant_id, payment_attempt.payment_id ); @@ -365,12 +365,12 @@ impl PaymentAttemptInterface for KVRouterStore { match kv_wrapper::( self, - KvOperation::SetNx(&field, &created_attempt), + KvOperation::HSetNx(&field, &created_attempt), &key, ) .await .change_context(errors::StorageError::KVError)? - .try_into_setnx() + .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { entity: "payment attempt", @@ -378,10 +378,8 @@ impl PaymentAttemptInterface for KVRouterStore { }) .into_report(), Ok(HsetnxReply::KeySet) => { - let conn = pg_connection_write(self).await?; - //Reverse lookup for attempt_id - ReverseLookupNew { + let reverse_lookup = ReverseLookupNew { lookup_id: format!( "{}_{}", &created_attempt.merchant_id, &created_attempt.attempt_id, @@ -389,13 +387,9 @@ impl PaymentAttemptInterface for KVRouterStore { pk_id: key, sk_id: field, source: "payment_attempt".to_string(), - } - .insert(&conn) - .await - .map_err(|er| { - let new_err = diesel_error_to_data_error(er.current_context()); - er.change_context(new_err) - })?; + }; + self.insert_reverse_lookup(reverse_lookup, storage_scheme) + .await?; let redis_entry = kv::TypedSql { op: kv::DBOperation::Insert { @@ -435,7 +429,7 @@ impl PaymentAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", this.merchant_id, this.payment_id); + let key = format!("mid_{}_pid_{}", this.merchant_id, this.payment_id); let old_connector_transaction_id = &this.connector_transaction_id; let old_preprocessing_id = &this.preprocessing_step_id; let updated_attempt = PaymentAttempt::from_storage_model( @@ -452,12 +446,12 @@ impl PaymentAttemptInterface for KVRouterStore { kv_wrapper::<(), _, _>( self, - KvOperation::Set::((&field, redis_value)), + KvOperation::Hset::((&field, redis_value)), &key, ) .await .change_context(errors::StorageError::KVError)? - .try_into_set() + .try_into_hset() .change_context(errors::StorageError::KVError)?; match ( @@ -466,22 +460,24 @@ impl PaymentAttemptInterface for KVRouterStore { ) { (None, Some(connector_transaction_id)) => { add_connector_txn_id_to_reverse_lookup( - &self.router_store, + self, key.as_str(), this.merchant_id.as_str(), updated_attempt.attempt_id.as_str(), connector_transaction_id.as_str(), + storage_scheme, ) .await?; } (Some(old_connector_transaction_id), Some(connector_transaction_id)) => { if old_connector_transaction_id.ne(connector_transaction_id) { add_connector_txn_id_to_reverse_lookup( - &self.router_store, + self, key.as_str(), this.merchant_id.as_str(), updated_attempt.attempt_id.as_str(), connector_transaction_id.as_str(), + storage_scheme, ) .await?; } @@ -492,22 +488,24 @@ impl PaymentAttemptInterface for KVRouterStore { match (old_preprocessing_id, &updated_attempt.preprocessing_step_id) { (None, Some(preprocessing_id)) => { add_preprocessing_id_to_reverse_lookup( - &self.router_store, + self, key.as_str(), this.merchant_id.as_str(), updated_attempt.attempt_id.as_str(), preprocessing_id.as_str(), + storage_scheme, ) .await?; } (Some(old_preprocessing_id), Some(preprocessing_id)) => { if old_preprocessing_id.ne(preprocessing_id) { add_preprocessing_id_to_reverse_lookup( - &self.router_store, + self, key.as_str(), this.merchant_id.as_str(), updated_attempt.attempt_id.as_str(), preprocessing_id.as_str(), + storage_scheme, ) .await?; } @@ -560,12 +558,14 @@ impl PaymentAttemptInterface for KVRouterStore { MerchantStorageScheme::RedisKv => { // We assume that PaymentAttempt <=> PaymentIntent is a one-to-one relation for now let lookup_id = format!("{merchant_id}_{connector_transaction_id}"); - let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?; + let lookup = self + .get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await?; let key = &lookup.pk_id; try_redis_get_else_try_database_get( async { - kv_wrapper(self, KvOperation::::Get(&lookup.sk_id), key).await?.try_into_get() + kv_wrapper(self, KvOperation::::HGet(&lookup.sk_id), key).await?.try_into_hget() }, || async {self.router_store.find_payment_attempt_by_connector_transaction_id_payment_id_merchant_id(connector_transaction_id, payment_id, merchant_id, storage_scheme).await}, ) @@ -591,7 +591,7 @@ impl PaymentAttemptInterface for KVRouterStore { match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { - let key = format!("{merchant_id}_{payment_id}"); + let key = format!("mid_{merchant_id}_pid_{payment_id}"); let pattern = "pa_*"; let redis_fut = async { @@ -636,14 +636,20 @@ impl PaymentAttemptInterface for KVRouterStore { } MerchantStorageScheme::RedisKv => { let lookup_id = format!("{merchant_id}_{connector_txn_id}"); - let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?; + let lookup = self + .get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await?; let key = &lookup.pk_id; try_redis_get_else_try_database_get( async { - kv_wrapper(self, KvOperation::::Get(&lookup.sk_id), key) - .await? - .try_into_get() + kv_wrapper( + self, + KvOperation::::HGet(&lookup.sk_id), + key, + ) + .await? + .try_into_hget() }, || async { self.router_store @@ -680,13 +686,13 @@ impl PaymentAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = format!("{merchant_id}_{payment_id}"); + let key = format!("mid_{merchant_id}_pid_{payment_id}"); let field = format!("pa_{attempt_id}"); try_redis_get_else_try_database_get( async { - kv_wrapper(self, KvOperation::::Get(&field), key) + kv_wrapper(self, KvOperation::::HGet(&field), key) .await? - .try_into_get() + .try_into_hget() }, || async { self.router_store @@ -722,13 +728,19 @@ impl PaymentAttemptInterface for KVRouterStore { } MerchantStorageScheme::RedisKv => { let lookup_id = format!("{merchant_id}_{attempt_id}"); - let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?; + let lookup = self + .get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await?; let key = &lookup.pk_id; try_redis_get_else_try_database_get( async { - kv_wrapper(self, KvOperation::::Get(&lookup.sk_id), key) - .await? - .try_into_get() + kv_wrapper( + self, + KvOperation::::HGet(&lookup.sk_id), + key, + ) + .await? + .try_into_hget() }, || async { self.router_store @@ -763,14 +775,20 @@ impl PaymentAttemptInterface for KVRouterStore { } MerchantStorageScheme::RedisKv => { let lookup_id = format!("{merchant_id}_{preprocessing_id}"); - let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?; + let lookup = self + .get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await?; let key = &lookup.pk_id; try_redis_get_else_try_database_get( async { - kv_wrapper(self, KvOperation::::Get(&lookup.sk_id), key) - .await? - .try_into_get() + kv_wrapper( + self, + KvOperation::::HGet(&lookup.sk_id), + key, + ) + .await? + .try_into_hget() }, || async { self.router_store @@ -804,12 +822,12 @@ impl PaymentAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = format!("{merchant_id}_{payment_id}"); + let key = format!("mid_{merchant_id}_pid_{payment_id}"); kv_wrapper(self, KvOperation::::Scan("pa_*"), key) .await .change_context(errors::StorageError::KVError)? - .try_into_get() + .try_into_scan() .change_context(errors::StorageError::KVError) } } @@ -1488,48 +1506,42 @@ impl DataModelExt for PaymentAttemptUpdate { #[inline] async fn add_connector_txn_id_to_reverse_lookup( - store: &RouterStore, + store: &KVRouterStore, key: &str, merchant_id: &str, updated_attempt_attempt_id: &str, connector_transaction_id: &str, + storage_scheme: MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection_write(store).await?; let field = format!("pa_{}", updated_attempt_attempt_id); - ReverseLookupNew { + let reverse_lookup_new = ReverseLookupNew { lookup_id: format!("{}_{}", merchant_id, connector_transaction_id), pk_id: key.to_owned(), sk_id: field.clone(), source: "payment_attempt".to_string(), - } - .insert(&conn) - .await - .map_err(|err| { - let new_err = diesel_error_to_data_error(err.current_context()); - err.change_context(new_err) - }) + }; + store + .insert_reverse_lookup(reverse_lookup_new, storage_scheme) + .await } #[inline] async fn add_preprocessing_id_to_reverse_lookup( - store: &RouterStore, + store: &KVRouterStore, key: &str, merchant_id: &str, updated_attempt_attempt_id: &str, preprocessing_id: &str, + storage_scheme: MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection_write(store).await?; let field = format!("pa_{}", updated_attempt_attempt_id); - ReverseLookupNew { + let reverse_lookup_new = ReverseLookupNew { lookup_id: format!("{}_{}", merchant_id, preprocessing_id), pk_id: key.to_owned(), sk_id: field.clone(), source: "payment_attempt".to_string(), - } - .insert(&conn) - .await - .map_err(|er| { - let new_err = diesel_error_to_data_error(er.current_context()); - er.change_context(new_err) - }) + }; + store + .insert_reverse_lookup(reverse_lookup_new, storage_scheme) + .await } diff --git a/crates/storage_impl/src/payments/payment_intent.rs b/crates/storage_impl/src/payments/payment_intent.rs index beb9a84b30..6814512028 100644 --- a/crates/storage_impl/src/payments/payment_intent.rs +++ b/crates/storage_impl/src/payments/payment_intent.rs @@ -55,7 +55,7 @@ impl PaymentIntentInterface for KVRouterStore { } MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", new.merchant_id, new.payment_id); + let key = format!("mid_{}_pid_{}", new.merchant_id, new.payment_id); let field = format!("pi_{}", new.payment_id); let created_intent = PaymentIntent { id: 0i32, @@ -95,12 +95,12 @@ impl PaymentIntentInterface for KVRouterStore { match kv_wrapper::( self, - KvOperation::SetNx(&field, &created_intent), + KvOperation::HSetNx(&field, &created_intent), &key, ) .await .change_context(StorageError::KVError)? - .try_into_setnx() + .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(StorageError::DuplicateValue { entity: "payment_intent", @@ -144,7 +144,7 @@ impl PaymentIntentInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", this.merchant_id, this.payment_id); + let key = format!("mid_{}_pid_{}", this.merchant_id, this.payment_id); let field = format!("pi_{}", this.payment_id); let updated_intent = payment_intent.clone().apply_changeset(this.clone()); @@ -156,12 +156,12 @@ impl PaymentIntentInterface for KVRouterStore { kv_wrapper::<(), _, _>( self, - KvOperation::::Set((&field, redis_value)), + KvOperation::::Hset((&field, redis_value)), &key, ) .await .change_context(StorageError::KVError)? - .try_into_set() + .try_into_hset() .change_context(StorageError::KVError)?; let redis_entry = kv::TypedSql { @@ -209,17 +209,17 @@ impl PaymentIntentInterface for KVRouterStore { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { - let key = format!("{merchant_id}_{payment_id}"); + let key = format!("mid_{merchant_id}_pid_{payment_id}"); let field = format!("pi_{payment_id}"); crate::utils::try_redis_get_else_try_database_get( async { kv_wrapper::( self, - KvOperation::::Get(&field), + KvOperation::::HGet(&field), &key, ) .await? - .try_into_get() + .try_into_hget() }, database_call, ) diff --git a/crates/storage_impl/src/redis/kv_store.rs b/crates/storage_impl/src/redis/kv_store.rs index 4f909f0b35..e9600d04de 100644 --- a/crates/storage_impl/src/redis/kv_store.rs +++ b/crates/storage_impl/src/redis/kv_store.rs @@ -23,6 +23,9 @@ pub enum PartitionKey<'a> { merchant_id: &'a str, payment_id: &'a str, }, + MerchantIdPaymentIdCombination { + combination: &'a str, + }, } impl<'a> std::fmt::Display for PartitionKey<'a> { @@ -32,6 +35,9 @@ impl<'a> std::fmt::Display for PartitionKey<'a> { merchant_id, payment_id, } => f.write_str(&format!("mid_{merchant_id}_pid_{payment_id}")), + PartitionKey::MerchantIdPaymentIdCombination { combination } => { + f.write_str(combination) + } } } } @@ -43,18 +49,22 @@ pub trait RedisConnInterface { } pub enum KvOperation<'a, S: serde::Serialize + Debug> { - Set((&'a str, String)), - SetNx(&'a str, S), - Get(&'a str), + Hset((&'a str, String)), + SetNx(S), + HSetNx(&'a str, S), + HGet(&'a str), + Get, Scan(&'a str), } #[derive(TryGetEnumVariant)] #[error(RedisError(UnknownResult))] pub enum KvResult { + HGet(T), Get(T), - Set(()), - SetNx(redis_interface::HsetnxReply), + Hset(()), + SetNx(redis_interface::SetnxReply), + HSetNx(redis_interface::HsetnxReply), Scan(Vec), } @@ -74,27 +84,37 @@ where let type_name = std::any::type_name::(); match op { - KvOperation::Set(value) => { + KvOperation::Hset(value) => { redis_conn .set_hash_fields(key, value, Some(consts::KV_TTL)) .await?; - Ok(KvResult::Set(())) + Ok(KvResult::Hset(())) } - KvOperation::Get(field) => { + KvOperation::HGet(field) => { let result = redis_conn .get_hash_field_and_deserialize(key, field, type_name) .await?; - Ok(KvResult::Get(result)) + Ok(KvResult::HGet(result)) } KvOperation::Scan(pattern) => { let result: Vec = redis_conn.hscan_and_deserialize(key, pattern, None).await?; Ok(KvResult::Scan(result)) } - KvOperation::SetNx(field, value) => { + KvOperation::HSetNx(field, value) => { let result = redis_conn .serialize_and_set_hash_field_if_not_exist(key, field, value, Some(consts::KV_TTL)) .await?; + Ok(KvResult::HSetNx(result)) + } + KvOperation::SetNx(value) => { + let result = redis_conn + .serialize_and_set_key_if_not_exist(key, value, Some(consts::KV_TTL.into())) + .await?; Ok(KvResult::SetNx(result)) } + KvOperation::Get => { + let result = redis_conn.get_and_deserialize_key(key, type_name).await?; + Ok(KvResult::Get(result)) + } } } diff --git a/crates/storage_impl/src/reverse_lookup.rs b/crates/storage_impl/src/reverse_lookup.rs new file mode 100644 index 0000000000..9466267a3c --- /dev/null +++ b/crates/storage_impl/src/reverse_lookup.rs @@ -0,0 +1,5 @@ +use diesel_models::reverse_lookup::ReverseLookup; + +use crate::redis::kv_store::KvStorePartition; + +impl KvStorePartition for ReverseLookup {}