diff --git a/crates/diesel_models/src/connector_response.rs b/crates/diesel_models/src/connector_response.rs index d67ba18a1b..401a6eec91 100644 --- a/crates/diesel_models/src/connector_response.rs +++ b/crates/diesel_models/src/connector_response.rs @@ -24,7 +24,6 @@ pub struct ConnectorResponseNew { #[derive(Clone, Debug, Deserialize, Serialize, Identifiable, Queryable)] #[diesel(table_name = connector_response)] pub struct ConnectorResponse { - #[serde(skip_serializing)] pub id: i32, pub payment_id: String, pub merchant_id: String, @@ -49,7 +48,7 @@ pub struct ConnectorResponseUpdateInternal { pub connector_name: Option, } -#[derive(Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum ConnectorResponseUpdate { ResponseUpdate { connector_transaction_id: Option, diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index eba4ce243a..5804107d65 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize}; use crate::{ address::AddressNew, + connector_response::{ConnectorResponse, ConnectorResponseNew, ConnectorResponseUpdate}, errors, payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate}, payment_intent::{PaymentIntent, PaymentIntentNew, PaymentIntentUpdate}, @@ -40,6 +41,7 @@ pub enum Insertable { PaymentIntent(PaymentIntentNew), PaymentAttempt(PaymentAttemptNew), Refund(RefundNew), + ConnectorResponse(ConnectorResponseNew), Address(Box), } @@ -49,6 +51,13 @@ pub enum Updateable { PaymentIntentUpdate(PaymentIntentUpdateMems), PaymentAttemptUpdate(PaymentAttemptUpdateMems), RefundUpdate(RefundUpdateMems), + ConnectorResponseUpdate(ConnectorResponseUpdateMems), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ConnectorResponseUpdateMems { + pub orig: ConnectorResponse, + pub update_data: ConnectorResponseUpdate, } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/diesel_models/src/query/connector_response.rs b/crates/diesel_models/src/query/connector_response.rs index 4ec41a2ad2..eea7e779d8 100644 --- a/crates/diesel_models/src/query/connector_response.rs +++ b/crates/diesel_models/src/query/connector_response.rs @@ -26,9 +26,17 @@ impl ConnectorResponse { conn: &PgPooledConn, connector_response: ConnectorResponseUpdate, ) -> StorageResult { - match generics::generic_update_by_id::<::Table, _, _, _>( + match generics::generic_update_with_unique_predicate_get_result::< + ::Table, + _, + _, + _, + >( conn, - self.id, + dsl::merchant_id + .eq(self.merchant_id.clone()) + .and(dsl::payment_id.eq(self.payment_id.clone())) + .and(dsl::attempt_id.eq(self.attempt_id.clone())), ConnectorResponseUpdateInternal::from(connector_response), ) .await diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index c832c67c92..fd3fa47d80 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -165,6 +165,7 @@ async fn drainer( let payment_intent = "payment_intent"; let payment_attempt = "payment_attempt"; let refund = "refund"; + let connector_response = "connector_response"; let address = "address"; match db_op { // TODO: Handle errors @@ -188,6 +189,13 @@ async fn drainer( kv::Insertable::Refund(a) => { macro_util::handle_resp!(a.insert(&conn).await, insert_op, refund) } + kv::Insertable::ConnectorResponse(a) => { + macro_util::handle_resp!( + a.insert(&conn).await, + insert_op, + connector_response + ) + } kv::Insertable::Address(addr) => { macro_util::handle_resp!(addr.insert(&conn).await, insert_op, address) } @@ -227,6 +235,11 @@ async fn drainer( refund ) } + kv::Updateable::ConnectorResponseUpdate(a) => macro_util::handle_resp!( + a.orig.update(&conn, a.update_data).await, + update_op, + connector_response + ), } }) .await; diff --git a/crates/router/src/db/connector_response.rs b/crates/router/src/db/connector_response.rs index 7a3121711f..331aea03d4 100644 --- a/crates/router/src/db/connector_response.rs +++ b/crates/router/src/db/connector_response.rs @@ -3,18 +3,17 @@ use router_env::{instrument, tracing}; use super::{MockDb, Store}; use crate::{ - connection, core::errors::{self, CustomResult}, - types::storage::{self, enums}, + types::storage::{self as storage_type, enums}, }; #[async_trait::async_trait] pub trait ConnectorResponseInterface { async fn insert_connector_response( &self, - connector_response: storage::ConnectorResponseNew, + connector_response: storage_type::ConnectorResponseNew, storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult; + ) -> CustomResult; async fn find_connector_response_by_payment_id_merchant_id_attempt_id( &self, @@ -22,63 +21,272 @@ pub trait ConnectorResponseInterface { merchant_id: &str, attempt_id: &str, storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult; + ) -> CustomResult; async fn update_connector_response( &self, - this: storage::ConnectorResponse, - payment_attempt: storage::ConnectorResponseUpdate, + this: storage_type::ConnectorResponse, + payment_attempt: storage_type::ConnectorResponseUpdate, storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult; + ) -> CustomResult; } -#[async_trait::async_trait] -impl ConnectorResponseInterface for Store { - #[instrument(skip_all)] - async fn insert_connector_response( - &self, - connector_response: storage::ConnectorResponseNew, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - connector_response - .insert(&conn) +#[cfg(not(feature = "kv_store"))] +mod storage { + use error_stack::IntoReport; + use router_env::{instrument, tracing}; + + use super::Store; + use crate::{ + connection, + core::errors::{self, CustomResult}, + types::storage::{self as storage_type, enums}, + }; + + #[async_trait::async_trait] + impl super::ConnectorResponseInterface for Store { + #[instrument(skip_all)] + async fn insert_connector_response( + &self, + connector_response: storage_type::ConnectorResponseNew, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + connector_response + .insert(&conn) + .await + .map_err(Into::into) + .into_report() + } + + #[instrument(skip_all)] + async fn find_connector_response_by_payment_id_merchant_id_attempt_id( + &self, + payment_id: &str, + merchant_id: &str, + attempt_id: &str, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + storage_type::ConnectorResponse::find_by_payment_id_merchant_id_attempt_id( + &conn, + payment_id, + merchant_id, + attempt_id, + ) .await .map_err(Into::into) .into_report() - } + } - #[instrument(skip_all)] - async fn find_connector_response_by_payment_id_merchant_id_attempt_id( - &self, - payment_id: &str, - merchant_id: &str, - attempt_id: &str, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage::ConnectorResponse::find_by_payment_id_merchant_id_attempt_id( - &conn, - payment_id, - merchant_id, - attempt_id, - ) - .await - .map_err(Into::into) - .into_report() + async fn update_connector_response( + &self, + this: storage_type::ConnectorResponse, + connector_response_update: storage_type::ConnectorResponseUpdate, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + this.update(&conn, connector_response_update) + .await + .map_err(Into::into) + .into_report() + } } +} - async fn update_connector_response( - &self, - this: storage::ConnectorResponse, - connector_response_update: storage::ConnectorResponseUpdate, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - this.update(&conn, connector_response_update) - .await - .map_err(Into::into) - .into_report() +#[cfg(feature = "kv_store")] +mod storage { + + use error_stack::{IntoReport, ResultExt}; + use redis_interface::HsetnxReply; + use router_env::{instrument, tracing}; + use storage_impl::redis::kv_store::{PartitionKey, RedisConnInterface}; + + use super::Store; + use crate::{ + connection, + core::errors::{self, CustomResult}, + types::storage::{self as storage_type, enums, kv}, + utils::db_utils, + }; + + #[async_trait::async_trait] + impl super::ConnectorResponseInterface for Store { + #[instrument(skip_all)] + async fn insert_connector_response( + &self, + connector_response: storage_type::ConnectorResponseNew, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + + match storage_scheme { + data_models::MerchantStorageScheme::PostgresOnly => connector_response + .insert(&conn) + .await + .map_err(Into::into) + .into_report(), + data_models::MerchantStorageScheme::RedisKv => { + let merchant_id = &connector_response.merchant_id; + let payment_id = &connector_response.payment_id; + let attempt_id = &connector_response.attempt_id; + + let key = format!("{merchant_id}_{payment_id}"); + let field = format!("connector_resp_{merchant_id}_{payment_id}_{attempt_id}"); + + let created_connector_resp = storage_type::ConnectorResponse { + id: Default::default(), + payment_id: connector_response.payment_id.clone(), + merchant_id: connector_response.merchant_id.clone(), + attempt_id: connector_response.attempt_id.clone(), + created_at: connector_response.created_at, + modified_at: connector_response.modified_at, + connector_name: connector_response.connector_name.clone(), + connector_transaction_id: connector_response + .connector_transaction_id + .clone(), + authentication_data: connector_response.authentication_data.clone(), + encoded_data: connector_response.encoded_data.clone(), + }; + match self + .get_redis_conn() + .map_err(|er| error_stack::report!(errors::StorageError::RedisError(er)))? + .serialize_and_set_hash_field_if_not_exist( + &key, + &field, + &created_connector_resp, + ) + .await + { + Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { + entity: "address", + key: Some(key), + }) + .into_report(), + Ok(HsetnxReply::KeySet) => { + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: kv::Insertable::ConnectorResponse( + connector_response.clone(), + ), + }, + }; + self.push_to_drainer_stream::( + redis_entry, + PartitionKey::MerchantIdPaymentId { + merchant_id, + payment_id, + }, + ) + .await + .change_context(errors::StorageError::KVError)?; + Ok(created_connector_resp) + } + Err(er) => Err(er).change_context(errors::StorageError::KVError), + } + } + } + } + + #[instrument(skip_all)] + async fn find_connector_response_by_payment_id_merchant_id_attempt_id( + &self, + payment_id: &str, + merchant_id: &str, + attempt_id: &str, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + let database_call = || async { + storage_type::ConnectorResponse::find_by_payment_id_merchant_id_attempt_id( + &conn, + payment_id, + merchant_id, + attempt_id, + ) + .await + .map_err(Into::into) + .into_report() + }; + match storage_scheme { + data_models::MerchantStorageScheme::PostgresOnly => database_call().await, + data_models::MerchantStorageScheme::RedisKv => { + let key = format!("{merchant_id}_{payment_id}"); + let field = format!("connector_resp_{merchant_id}_{payment_id}_{attempt_id}"); + let redis_conn = self + .get_redis_conn() + .map_err(|er| error_stack::report!(errors::StorageError::RedisError(er)))?; + + let redis_fut = redis_conn.get_hash_field_and_deserialize( + &key, + &field, + "ConnectorResponse", + ); + + db_utils::try_redis_get_else_try_database_get(redis_fut, database_call).await + } + } + } + + async fn update_connector_response( + &self, + this: storage_type::ConnectorResponse, + connector_response_update: storage_type::ConnectorResponseUpdate, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + match storage_scheme { + data_models::MerchantStorageScheme::PostgresOnly => this + .update(&conn, connector_response_update) + .await + .map_err(Into::into) + .into_report(), + data_models::MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", this.merchant_id, this.payment_id); + let updated_connector_response = connector_response_update + .clone() + .apply_changeset(this.clone()); + let redis_value = serde_json::to_string(&updated_connector_response) + .into_report() + .change_context(errors::StorageError::KVError)?; + let field = format!( + "connector_resp_{}_{}_{}", + &updated_connector_response.merchant_id, + &updated_connector_response.payment_id, + &updated_connector_response.attempt_id + ); + let updated_connector_response = self + .get_redis_conn() + .map_err(|er| error_stack::report!(errors::StorageError::RedisError(er)))? + .set_hash_fields(&key, (&field, &redis_value)) + .await + .map(|_| updated_connector_response) + .change_context(errors::StorageError::KVError)?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Update { + updatable: kv::Updateable::ConnectorResponseUpdate( + kv::ConnectorResponseUpdateMems { + orig: this, + update_data: connector_response_update, + }, + ), + }, + }; + + self.push_to_drainer_stream::( + redis_entry, + PartitionKey::MerchantIdPaymentId { + merchant_id: &updated_connector_response.merchant_id, + payment_id: &updated_connector_response.payment_id, + }, + ) + .await + .change_context(errors::StorageError::KVError)?; + Ok(updated_connector_response) + } + } + } } } @@ -87,11 +295,11 @@ impl ConnectorResponseInterface for MockDb { #[instrument(skip_all)] async fn insert_connector_response( &self, - new: storage::ConnectorResponseNew, + new: storage_type::ConnectorResponseNew, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { let mut connector_response = self.connector_response.lock().await; - let response = storage::ConnectorResponse { + let response = storage_type::ConnectorResponse { id: connector_response .len() .try_into() @@ -118,7 +326,7 @@ impl ConnectorResponseInterface for MockDb { _merchant_id: &str, _attempt_id: &str, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { // [#172]: Implement function for `MockDb` Err(errors::StorageError::MockDbError)? } @@ -127,10 +335,10 @@ impl ConnectorResponseInterface for MockDb { #[allow(clippy::unwrap_used)] async fn update_connector_response( &self, - this: storage::ConnectorResponse, - connector_response_update: storage::ConnectorResponseUpdate, + this: storage_type::ConnectorResponse, + connector_response_update: storage_type::ConnectorResponseUpdate, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { let mut connector_response = self.connector_response.lock().await; let response = connector_response .iter_mut() diff --git a/crates/router/src/types/storage/kv.rs b/crates/router/src/types/storage/kv.rs index c9cdad8a33..51c9eb2b42 100644 --- a/crates/router/src/types/storage/kv.rs +++ b/crates/router/src/types/storage/kv.rs @@ -1,4 +1,4 @@ pub use diesel_models::kv::{ - DBOperation, Insertable, PaymentAttemptUpdateMems, PaymentIntentUpdateMems, RefundUpdateMems, - TypedSql, Updateable, + ConnectorResponseUpdateMems, DBOperation, Insertable, PaymentAttemptUpdateMems, + PaymentIntentUpdateMems, RefundUpdateMems, TypedSql, Updateable, }; diff --git a/crates/storage_impl/src/connector_response.rs b/crates/storage_impl/src/connector_response.rs new file mode 100644 index 0000000000..7d4ff6df94 --- /dev/null +++ b/crates/storage_impl/src/connector_response.rs @@ -0,0 +1,5 @@ +use diesel_models::connector_response::ConnectorResponse; + +use crate::redis::kv_store::KvStorePartition; + +impl KvStorePartition for ConnectorResponse {} diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index f370853735..5f355e7134 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -8,6 +8,7 @@ use redis::{kv_store::RedisConnInterface, RedisStore}; mod address; pub mod config; pub mod connection; +mod connector_response; pub mod database; pub mod errors; mod lookup;